diff --git a/Gopkg.lock b/Gopkg.lock index 907ab5fca06..c4018435c59 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -96,6 +96,14 @@ revision = "5c37fe3735342a2e0d01c87a907579987c8936cc" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:aa8f6e1219c8c1bf569ded1c45485f4cf6900b0a99a2a17e253d8dde6515da43" + name = "github.com/cockroachdb/cmux" + packages = ["."] + pruneopts = "" + revision = "30d10be492927e2dcae0089c374c455d42414fcb" + [[projects]] branch = "master" digest = "1:c367c68b4bf22ef91069ae442422025da1a3a57049b370252a7b4a895c3fdd6b" @@ -401,6 +409,14 @@ revision = "4e5d6424981844faafc4b0649036b2e0395bdf99" version = "v0.2.0" +[[projects]] + branch = "master" + digest = "1:d4191af8b10083895f25011ae7b17c09db4794147fe3070b2ebc932de662624a" + name = "github.com/mwitkow/go-conntrack" + packages = ["."] + pruneopts = "" + revision = "cc309e4a22231782e8893f3c35ced0967807a33e" + [[projects]] digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577" name = "github.com/oklog/run" @@ -425,6 +441,14 @@ revision = "e6d60cf7ba1f42d86d54cdf5508611c4aafb3970" version = "v0.0.1" +[[projects]] + branch = "master" + digest = "1:457e347f92110ed9d83f034ce686679dca8a87c1e4294b41a8525d0b389893bb" + name = "github.com/opentracing-contrib/go-stdlib" + packages = ["nethttp"] + pruneopts = "" + revision = "77df8e8e70b403c6b13c0fffaa4867c9044ff4e9" + [[projects]] digest = "1:171ac7b583c9e4f28dfd3c310fdef0802a9db6afa066ab71e2134ff2cfb646d7" name = "github.com/opentracing/basictracer-go" @@ -537,6 +561,7 @@ "fileutil", "index", "labels", + "testutil", "wal", ] pruneopts = "" @@ -614,6 +639,7 @@ "internal/timeseries", "ipv4", "ipv6", + "netutil", "trace", ] pruneopts = "" @@ -813,6 +839,7 @@ "github.com/NYTimes/gziphandler", "github.com/armon/go-metrics", "github.com/armon/go-metrics/prometheus", + "github.com/cockroachdb/cmux", "github.com/fatih/structtag", "github.com/fortytw2/leaktest", "github.com/fsnotify/fsnotify", @@ -838,9 +865,11 @@ "github.com/minio/minio-go/pkg/credentials", "github.com/minio/minio-go/pkg/encrypt", "github.com/mozillazg/go-cos", + "github.com/mwitkow/go-conntrack", "github.com/oklog/run", "github.com/oklog/ulid", "github.com/olekukonko/tablewriter", + "github.com/opentracing-contrib/go-stdlib/nethttp", "github.com/opentracing/basictracer-go", "github.com/opentracing/opentracing-go", "github.com/opentracing/opentracing-go/ext", @@ -867,7 +896,9 @@ "github.com/prometheus/tsdb/fileutil", "github.com/prometheus/tsdb/index", "github.com/prometheus/tsdb/labels", + "github.com/prometheus/tsdb/testutil", "golang.org/x/net/context", + "golang.org/x/net/netutil", "golang.org/x/sync/errgroup", "golang.org/x/text/language", "golang.org/x/text/message", diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b0bb1f75d36..629075bf52b 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -78,6 +78,7 @@ func main() { registerCompact(cmds, app, "compact") registerBucket(cmds, app, "bucket") registerDownsample(cmds, app, "downsample") + registerRemoteWriteReceive(cmds, app, "remote-write-receive") cmd, err := app.Parse(os.Args[1:]) if err != nil { diff --git a/cmd/thanos/remote-write-receive.go b/cmd/thanos/remote-write-receive.go new file mode 100644 index 00000000000..0470ea5450f --- /dev/null +++ b/cmd/thanos/remote-write-receive.go @@ -0,0 +1,281 @@ +package main + +import ( + "context" + "fmt" + "math" + "net" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/cluster" + "github.com/improbable-eng/thanos/pkg/receive" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/oklog/run" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/tsdb" + "google.golang.org/grpc" + kingpin "gopkg.in/alecthomas/kingpin.v2" +) + +func registerRemoteWriteReceive(m map[string]setupFunc, app *kingpin.Application, name string) { + cmd := app.Command(name, "accept Prometheus remote write API requests (EXPERIMENTAL, this may change drastically without notice)") + + grpcBindAddr, httpMetricsBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd) + + remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). + Default("0.0.0.0:19291").String() + + dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB."). + Default("./data").String() + + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + peer, err := newPeerFn(logger, reg, false, "", false) + if err != nil { + return errors.Wrap(err, "new cluster peer") + } + return runReceiver( + g, + logger, + reg, + tracer, + *grpcBindAddr, + *cert, + *key, + *clientCA, + *httpMetricsBindAddr, + *remoteWriteAddress, + *dataDir, + peer, + name, + ) + } +} + +func runReceiver( + g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + tracer opentracing.Tracer, + grpcBindAddr string, + cert string, + key string, + clientCA string, + httpMetricsBindAddr string, + remoteWriteAddress string, + dataDir string, + peer cluster.Peer, + component string, +) error { + level.Info(logger).Log("msg", "setting up receiver") + + var metadata = &promMetadata{ + // Start out with the full time range. The shipper will constrain it later. + // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. + mint: 0, + maxt: math.MaxInt64, + } + + tsdbCfg := &tsdb.Options{ + Retention: model.Duration(time.Hour * 24 * 15), + NoLockfile: true, + MinBlockDuration: model.Duration(time.Hour * 2), + MaxBlockDuration: model.Duration(time.Hour * 2), + } + + ctxWeb, cancelWeb := context.WithCancel(context.Background()) + localStorage := &tsdb.ReadyStorage{} + receiver := receive.NewReceiveWriter(log.With(logger, "component", "receive-writer"), localStorage) + webHandler := receive.NewHandler(log.With(logger, "component", "remote-receive-handler"), &receive.Options{ + Context: ctxWeb, + Receiver: receiver, + ListenAddress: remoteWriteAddress, + MaxConnections: 1000, + ReadTimeout: time.Minute * 5, + Registry: reg, + ReadyStorage: localStorage, + }) + + // Start all components while we wait for TSDB to open but only load + // initial config and mark ourselves as ready after it completed. + dbOpen := make(chan struct{}) + + // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). + type closeOnce struct { + C chan struct{} + once sync.Once + Close func() + } + // Wait until the server is ready to handle reloading. + reloadReady := &closeOnce{ + C: make(chan struct{}), + } + reloadReady.Close = func() { + reloadReady.once.Do(func() { + close(reloadReady.C) + }) + } + + level.Debug(logger).Log("msg", "setting up joining cluster") + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + mint, maxt := metadata.Timestamps() + if peer != nil { + if err := peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ + Labels: metadata.LabelsPB(), + MinTime: mint, + MaxTime: maxt, + }); err != nil { + return errors.Wrap(err, "join cluster") + } + } + + <-ctx.Done() + return nil + }, func(err error) { + level.Debug(logger).Log("msg", "mesh group errored", "err", err) + cancel() + peer.Close(2 * time.Second) + }) + } + + level.Debug(logger).Log("msg", "setting up endpoint readiness") + { + // Initial configuration loading. + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case <-dbOpen: + break + // In case a shutdown is initiated before the dbOpen is released + case <-cancel: + reloadReady.Close() + return nil + } + + reloadReady.Close() + + webHandler.Ready() + level.Info(logger).Log("msg", "Server is ready to receive web requests.") + <-cancel + return nil + }, + func(err error) { + level.Debug(logger).Log("msg", "initial load group errored", "err", err) + close(cancel) + }, + ) + } + + level.Debug(logger).Log("msg", "setting up tsdb") + { + // TSDB. + cancel := make(chan struct{}) + g.Add( + func() error { + level.Info(logger).Log("msg", "Starting TSDB ...") + db, err := tsdb.Open( + dataDir, + log.With(logger, "component", "tsdb"), + reg, + tsdbCfg, + ) + if err != nil { + return fmt.Errorf("opening storage failed: %s", err) + } + level.Info(logger).Log("msg", "TSDB started") + + startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000) + localStorage.Set(db, startTimeMargin) + close(dbOpen) + <-cancel + return nil + }, + func(err error) { + level.Debug(logger).Log("msg", "tsdb group errored", "err", err) + if err := localStorage.Close(); err != nil { + level.Error(logger).Log("msg", "Error stopping storage", "err", err) + } + close(cancel) + }, + ) + } + + level.Debug(logger).Log("msg", "setting up metric http listen-group") + if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil { + level.Debug(logger).Log("msg", "metric listener errored", "err", err) + return err + } + + level.Debug(logger).Log("msg", "setting up grpc server") + { + var ( + logger = log.With(logger, "component", "receiver") + + s *grpc.Server + l net.Listener + err error + ) + g.Add(func() error { + select { + case <-dbOpen: + break + } + + l, err = net.Listen("tcp", grpcBindAddr) + if err != nil { + return errors.Wrap(err, "listen API address") + } + + db := localStorage.Get() + tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, nil) + + opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + s = grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, tsdbStore) + + level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + return errors.Wrap(s.Serve(l), "serve gRPC") + }, func(error) { + level.Debug(logger).Log("msg", "grpc group errored", "err", err) + if s != nil { + s.Stop() + } + if l != nil { + runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") + } + }) + } + + level.Debug(logger).Log("msg", "setting up receive http handler") + { + // Web handler. + g.Add( + func() error { + if err := webHandler.Run(ctxWeb); err != nil { + return fmt.Errorf("error starting web server: %s", err) + } + return nil + }, + func(err error) { + level.Debug(logger).Log("msg", "receive http handler errored", "err", err) + cancelWeb() + }, + ) + } + level.Info(logger).Log("msg", "starting receiver", "peer", peer.Name()) + + return nil +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go new file mode 100644 index 00000000000..bcc50a7f023 --- /dev/null +++ b/pkg/receive/handler.go @@ -0,0 +1,220 @@ +package receive + +import ( + "context" + "fmt" + "io/ioutil" + stdlog "log" + "net" + "net/http" + "runtime/debug" + "sync/atomic" + "time" + + "github.com/cockroachdb/cmux" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/improbable-eng/thanos/pkg/store/prompb" + conntrack "github.com/mwitkow/go-conntrack" + "github.com/opentracing-contrib/go-stdlib/nethttp" + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/storage/tsdb" + "golang.org/x/net/netutil" +) + +var ( + requestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "prometheus_http_request_duration_seconds", + Help: "Histogram of latencies for HTTP requests.", + Buckets: []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, + }, + []string{"handler"}, + ) + responseSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "prometheus_http_response_size_bytes", + Help: "Histogram of response size for HTTP requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), + }, + []string{"handler"}, + ) +) + +// Options for the web Handler. +type Options struct { + Receiver *ReceiveWriter + Context context.Context + ListenAddress string + MaxConnections int + ReadTimeout time.Duration + Registry prometheus.Registerer + ReadyStorage *tsdb.ReadyStorage +} + +// Handler serves various HTTP endpoints of the Prometheus server +type Handler struct { + readyStorage *tsdb.ReadyStorage + context context.Context + logger log.Logger + receiver *ReceiveWriter + router *route.Router + options *Options + quitCh chan struct{} + + ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions. +} + +func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return promhttp.InstrumentHandlerDuration( + requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), + promhttp.InstrumentHandlerResponseSize( + responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + handler, + ), + ) +} + +func NewHandler(logger log.Logger, o *Options) *Handler { + router := route.New().WithInstrumentation(instrumentHandler) + if logger == nil { + logger = log.NewNopLogger() + } + + h := &Handler{ + logger: logger, + router: router, + readyStorage: o.ReadyStorage, + context: o.Context, + receiver: o.Receiver, + options: o, + quitCh: make(chan struct{}), + } + + readyf := h.testReady + router.Post("/receive", readyf(h.receive)) + + return h +} + +// Ready sets Handler to be ready. +func (h *Handler) Ready() { + atomic.StoreUint32(&h.ready, 1) +} + +// Verifies whether the server is ready or not. +func (h *Handler) isReady() bool { + ready := atomic.LoadUint32(&h.ready) + return ready > 0 +} + +// Checks if server is ready, calls f if it is, returns 503 if it is not. +func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if h.isReady() { + f(w, r) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintf(w, "Service Unavailable") + } + } +} + +// Quit returns the receive-only quit channel. +func (h *Handler) Quit() <-chan struct{} { + return h.quitCh +} + +// Checks if server is ready, calls f if it is, returns 503 if it is not. +func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc { + return h.testReady(f.ServeHTTP) +} + +// Run serves the HTTP endpoints. +func (h *Handler) Run(ctx context.Context) error { + level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) + + listener, err := net.Listen("tcp", h.options.ListenAddress) + if err != nil { + return err + } + listener = netutil.LimitListener(listener, h.options.MaxConnections) + + // Monitor incoming connections with conntrack. + listener = conntrack.NewListener(listener, + conntrack.TrackWithName("http"), + conntrack.TrackWithTracing()) + + var ( + m = cmux.New(listener) + httpl = m.Match(cmux.HTTP1Fast()) + ) + + operationName := nethttp.OperationNameFunc(func(r *http.Request) string { + return fmt.Sprintf("%s %s", r.Method, r.URL.Path) + }) + mux := http.NewServeMux() + mux.Handle("/", h.router) + + errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) + + httpSrv := &http.Server{ + Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName), + ErrorLog: errlog, + ReadTimeout: h.options.ReadTimeout, + } + + errCh := make(chan error) + go func() { + errCh <- httpSrv.Serve(httpl) + }() + go func() { + errCh <- m.Serve() + }() + + select { + case e := <-errCh: + return e + case <-ctx.Done(): + httpSrv.Shutdown(ctx) + return nil + } +} + +func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { + defer func() { + if r := recover(); r != nil { + fmt.Println("panic recovered:", r, string(debug.Stack())) + } + }() + + compressed, err := ioutil.ReadAll(req.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + fmt.Println("snappy decode error") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var wreq prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &wreq); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = h.receiver.Receive(&wreq) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} diff --git a/pkg/receive/receivewriter.go b/pkg/receive/receivewriter.go new file mode 100644 index 00000000000..03b84261071 --- /dev/null +++ b/pkg/receive/receivewriter.go @@ -0,0 +1,57 @@ +package receive + +import ( + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/store/prompb" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +// Appendable returns an Appender. +type Appendable interface { + Appender() (storage.Appender, error) +} + +type ReceiveWriter struct { + logger log.Logger + append Appendable +} + +func NewReceiveWriter(logger log.Logger, app Appendable) *ReceiveWriter { + return &ReceiveWriter{ + logger: logger, + append: app, + } +} + +func (r *ReceiveWriter) Receive(wreq *prompb.WriteRequest) error { + app, err := r.append.Appender() + if err != nil { + return errors.Wrap(err, "failed to get appender") + } + + for _, t := range wreq.Timeseries { + lset := make(labels.Labels, len(t.Labels)) + for j := range t.Labels { + lset[j] = labels.Label{ + Name: t.Labels[j].Name, + Value: t.Labels[j].Value, + } + } + + for _, s := range t.Samples { + _, err = app.Add(lset, s.Timestamp, s.Value) + if err != nil { + return errors.Wrap(err, "failed to non-fast add") + } + } + } + + if err := app.Commit(); err != nil { + return errors.Wrap(err, "failed to commit") + } + + return nil +} diff --git a/pkg/store/prompb/remote.pb.go b/pkg/store/prompb/remote.pb.go index 590290c658b..06a0952c98e 100644 --- a/pkg/store/prompb/remote.pb.go +++ b/pkg/store/prompb/remote.pb.go @@ -49,9 +49,49 @@ func (x LabelMatcher_Type) String() string { return proto.EnumName(LabelMatcher_Type_name, int32(x)) } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{7, 0} + return fileDescriptor_remote_930be8df34ca631b, []int{8, 0} } +type WriteRequest struct { + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_remote_930be8df34ca631b, []int{0} +} +func (m *WriteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *WriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteRequest.Merge(dst, src) +} +func (m *WriteRequest) XXX_Size() int { + return m.Size() +} +func (m *WriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteRequest proto.InternalMessageInfo + type ReadRequest struct { Queries []Query `protobuf:"bytes,1,rep,name=queries" json:"queries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -63,7 +103,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{0} + return fileDescriptor_remote_930be8df34ca631b, []int{1} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -104,7 +144,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{1} + return fileDescriptor_remote_930be8df34ca631b, []int{2} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,6 +177,7 @@ type Query struct { StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` + Hints *ReadHints `protobuf:"bytes,4,opt,name=hints" json:"hints,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -146,7 +187,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{2} + return fileDescriptor_remote_930be8df34ca631b, []int{3} } func (m *Query) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -186,7 +227,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} } func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (*QueryResult) ProtoMessage() {} func (*QueryResult) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{3} + return fileDescriptor_remote_930be8df34ca631b, []int{4} } func (m *QueryResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -227,7 +268,7 @@ func (m *Sample) Reset() { *m = Sample{} } func (m *Sample) String() string { return proto.CompactTextString(m) } func (*Sample) ProtoMessage() {} func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{4} + return fileDescriptor_remote_930be8df34ca631b, []int{5} } func (m *Sample) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -268,7 +309,7 @@ func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (m *TimeSeries) String() string { return proto.CompactTextString(m) } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{5} + return fileDescriptor_remote_930be8df34ca631b, []int{6} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -309,7 +350,7 @@ func (m *Label) Reset() { *m = Label{} } func (m *Label) String() string { return proto.CompactTextString(m) } func (*Label) ProtoMessage() {} func (*Label) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{6} + return fileDescriptor_remote_930be8df34ca631b, []int{7} } func (m *Label) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -352,7 +393,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{7} + return fileDescriptor_remote_930be8df34ca631b, []int{8} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -381,7 +422,51 @@ func (m *LabelMatcher) XXX_DiscardUnknown() { var xxx_messageInfo_LabelMatcher proto.InternalMessageInfo +type ReadHints struct { + StepMs int64 `protobuf:"varint,1,opt,name=step_ms,json=stepMs,proto3" json:"step_ms,omitempty"` + Func string `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"` + StartMs int64 `protobuf:"varint,3,opt,name=start_ms,json=startMs,proto3" json:"start_ms,omitempty"` + EndMs int64 `protobuf:"varint,4,opt,name=end_ms,json=endMs,proto3" json:"end_ms,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadHints) Reset() { *m = ReadHints{} } +func (m *ReadHints) String() string { return proto.CompactTextString(m) } +func (*ReadHints) ProtoMessage() {} +func (*ReadHints) Descriptor() ([]byte, []int) { + return fileDescriptor_remote_930be8df34ca631b, []int{9} +} +func (m *ReadHints) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadHints) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadHints.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *ReadHints) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadHints.Merge(dst, src) +} +func (m *ReadHints) XXX_Size() int { + return m.Size() +} +func (m *ReadHints) XXX_DiscardUnknown() { + xxx_messageInfo_ReadHints.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadHints proto.InternalMessageInfo + func init() { + proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*Query)(nil), "prometheus.Query") @@ -390,8 +475,42 @@ func init() { proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") proto.RegisterType((*Label)(nil), "prometheus.Label") proto.RegisterType((*LabelMatcher)(nil), "prometheus.LabelMatcher") + proto.RegisterType((*ReadHints)(nil), "prometheus.ReadHints") proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) } +func (m *WriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func (m *ReadRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -495,6 +614,16 @@ func (m *Query) MarshalTo(dAtA []byte) (int, error) { i += n } } + if m.Hints != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.Hints.Size())) + n1, err := m.Hints.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -682,6 +811,48 @@ func (m *LabelMatcher) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ReadHints) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadHints) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.StepMs != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StepMs)) + } + if len(m.Func) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Func))) + i += copy(dAtA[i:], m.Func) + } + if m.StartMs != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StartMs)) + } + if m.EndMs != 0 { + dAtA[i] = 0x20 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.EndMs)) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -691,6 +862,21 @@ func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *WriteRequest) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *ReadRequest) Size() (n int) { var l int _ = l @@ -736,6 +922,10 @@ func (m *Query) Size() (n int) { n += 1 + l + sovRemote(uint64(l)) } } + if m.Hints != nil { + l = m.Hints.Size() + n += 1 + l + sovRemote(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -830,6 +1020,28 @@ func (m *LabelMatcher) Size() (n int) { return n } +func (m *ReadHints) Size() (n int) { + var l int + _ = l + if m.StepMs != 0 { + n += 1 + sovRemote(uint64(m.StepMs)) + } + l = len(m.Func) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + if m.StartMs != 0 { + n += 1 + sovRemote(uint64(m.StartMs)) + } + if m.EndMs != 0 { + n += 1 + sovRemote(uint64(m.EndMs)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovRemote(x uint64) (n int) { for { n++ @@ -843,6 +1055,88 @@ func sovRemote(x uint64) (n int) { func sozRemote(x uint64) (n int) { return sovRemote(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *WriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ReadRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1105,6 +1399,39 @@ func (m *Query) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Hints == nil { + m.Hints = &ReadHints{} + } + if err := m.Hints.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRemote(dAtA[iNdEx:]) @@ -1640,6 +1967,143 @@ func (m *LabelMatcher) Unmarshal(dAtA []byte) error { } return nil } +func (m *ReadHints) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadHints: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadHints: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StepMs", wireType) + } + m.StepMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StepMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Func", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Func = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartMs", wireType) + } + m.StartMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndMs", wireType) + } + m.EndMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRemote(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -1745,36 +2209,42 @@ var ( ErrIntOverflowRemote = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("remote.proto", fileDescriptor_remote_5645ea049238b205) } - -var fileDescriptor_remote_5645ea049238b205 = []byte{ - // 448 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0xc1, 0x8a, 0x13, 0x41, - 0x10, 0x86, 0xd3, 0x33, 0xc9, 0xc4, 0xad, 0x84, 0x65, 0x2c, 0x16, 0x0d, 0xa2, 0x51, 0xe6, 0x94, - 0x83, 0x64, 0x49, 0x3c, 0x08, 0xb2, 0x07, 0x59, 0x08, 0x1e, 0x74, 0x85, 0xf4, 0xee, 0xc9, 0xcb, - 0x32, 0x31, 0xc5, 0xee, 0xc2, 0x4c, 0x66, 0xd2, 0xdd, 0x23, 0xe4, 0x41, 0x3c, 0xf9, 0x42, 0x39, - 0xfa, 0x04, 0xa2, 0x79, 0x12, 0xe9, 0xea, 0x99, 0xa4, 0xc5, 0xdd, 0x5b, 0x77, 0xd5, 0x57, 0x7f, - 0xfd, 0x55, 0x4d, 0x43, 0x5f, 0x51, 0x5e, 0x18, 0x1a, 0x97, 0xaa, 0x30, 0x05, 0x42, 0xa9, 0x8a, - 0x9c, 0xcc, 0x2d, 0x55, 0xfa, 0xd9, 0xc9, 0x4d, 0x71, 0x53, 0x70, 0xf8, 0xd4, 0x9e, 0x1c, 0x91, - 0xbc, 0x87, 0x9e, 0xa4, 0x74, 0x29, 0x69, 0x5d, 0x91, 0x36, 0x38, 0x81, 0xee, 0xba, 0x22, 0x75, - 0x47, 0x7a, 0x20, 0x5e, 0x85, 0xa3, 0xde, 0xf4, 0xf1, 0xf8, 0x20, 0x31, 0x9e, 0x57, 0xa4, 0x36, - 0xe7, 0xed, 0xed, 0xaf, 0x97, 0x2d, 0xd9, 0x70, 0xc9, 0x07, 0xe8, 0x3b, 0x05, 0x5d, 0x16, 0x2b, - 0x4d, 0xf8, 0x16, 0xba, 0x8a, 0x74, 0x95, 0x99, 0x46, 0xe2, 0xe9, 0x7f, 0x12, 0x92, 0xf3, 0x8d, - 0x50, 0x4d, 0x27, 0x3f, 0x04, 0x74, 0x38, 0x8d, 0xaf, 0x01, 0xb5, 0x49, 0x95, 0xb9, 0x36, 0x77, - 0x39, 0x69, 0x93, 0xe6, 0xe5, 0x75, 0x6e, 0xd5, 0xc4, 0x28, 0x94, 0x31, 0x67, 0xae, 0x9a, 0xc4, - 0x85, 0xc6, 0x11, 0xc4, 0xb4, 0x5a, 0xfe, 0xcb, 0x06, 0xcc, 0x1e, 0xd3, 0x6a, 0xe9, 0x93, 0xef, - 0xe0, 0x51, 0x9e, 0x9a, 0xaf, 0xb7, 0xa4, 0xf4, 0x20, 0x64, 0x6f, 0x03, 0xdf, 0xdb, 0xa7, 0x74, - 0x41, 0xd9, 0x85, 0x03, 0x6a, 0x73, 0x7b, 0x3e, 0xf9, 0x08, 0x3d, 0xcf, 0x3b, 0x9e, 0x01, 0x70, - 0x43, 0x7f, 0x57, 0x4f, 0x7c, 0x31, 0xdb, 0xf7, 0x92, 0xb3, 0xb5, 0x94, 0xc7, 0x27, 0x67, 0x10, - 0x5d, 0xa6, 0x79, 0x99, 0x11, 0x9e, 0x40, 0xe7, 0x5b, 0x9a, 0x55, 0xc4, 0xd3, 0x09, 0xe9, 0x2e, - 0xf8, 0x1c, 0x8e, 0xf6, 0xe3, 0xd4, 0xb3, 0x1c, 0x02, 0xc9, 0x1a, 0xe0, 0xa0, 0x8e, 0xa7, 0x10, - 0x65, 0xd6, 0xf8, 0xbd, 0x2f, 0xc6, 0x23, 0xd5, 0x06, 0x6a, 0x0c, 0xa7, 0xd0, 0xd5, 0xdc, 0xdc, - 0xae, 0xc9, 0x56, 0xa0, 0x5f, 0xe1, 0x7c, 0x35, 0x6f, 0x53, 0x83, 0xc9, 0x04, 0x3a, 0x2c, 0x85, - 0x08, 0xed, 0x55, 0x9a, 0x3b, 0xbb, 0x47, 0x92, 0xcf, 0x87, 0x19, 0x02, 0x0e, 0xba, 0x4b, 0xf2, - 0x5d, 0x40, 0xdf, 0xdf, 0x28, 0x4e, 0xa0, 0x6d, 0x36, 0xa5, 0x2b, 0x3d, 0x9e, 0xbe, 0x78, 0x68, - 0xf3, 0xe3, 0xab, 0x4d, 0x49, 0x92, 0xd1, 0x7d, 0xb7, 0xe0, 0xbe, 0x6e, 0xa1, 0xdf, 0x6d, 0x04, - 0x6d, 0x5b, 0x87, 0x11, 0x04, 0xb3, 0x79, 0xdc, 0xc2, 0x2e, 0x84, 0x9f, 0x67, 0xf3, 0x58, 0xd8, - 0x80, 0x9c, 0xc5, 0x01, 0x07, 0xe4, 0x2c, 0x0e, 0xcf, 0x07, 0xdb, 0x3f, 0xc3, 0xd6, 0x76, 0x37, - 0x14, 0x3f, 0x77, 0x43, 0xf1, 0x7b, 0x37, 0x14, 0x5f, 0x22, 0xeb, 0xa4, 0x5c, 0x2c, 0x22, 0xfe, - 0x12, 0x6f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0x64, 0x87, 0x06, 0x4f, 0x44, 0x03, 0x00, 0x00, +func init() { proto.RegisterFile("remote.proto", fileDescriptor_remote_930be8df34ca631b) } + +var fileDescriptor_remote_930be8df34ca631b = []byte{ + // 535 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x51, 0x8b, 0xd3, 0x40, + 0x10, 0xbe, 0x34, 0x6d, 0x72, 0x9d, 0x96, 0x23, 0x0e, 0x77, 0x5e, 0x15, 0xad, 0x47, 0x9e, 0x0a, + 0x4a, 0x8f, 0xd6, 0x07, 0x41, 0xee, 0x41, 0x0e, 0x8a, 0x82, 0x57, 0xa1, 0x7b, 0x05, 0xc1, 0x97, + 0x23, 0xbd, 0x8e, 0xd7, 0x4a, 0x36, 0x49, 0xb3, 0x1b, 0xa1, 0x3f, 0xc4, 0xff, 0xd4, 0x47, 0x7f, + 0x81, 0x68, 0x7f, 0x89, 0xec, 0x6e, 0xd2, 0xae, 0x78, 0x3e, 0xf9, 0x96, 0x99, 0xf9, 0xe6, 0x9b, + 0xef, 0xdb, 0x99, 0x40, 0x3b, 0x27, 0x9e, 0x4a, 0xea, 0x67, 0x79, 0x2a, 0x53, 0x84, 0x2c, 0x4f, + 0x39, 0xc9, 0x05, 0x15, 0xe2, 0xf1, 0xf1, 0x5d, 0x7a, 0x97, 0xea, 0xf4, 0xb9, 0xfa, 0x32, 0x88, + 0xf0, 0x0a, 0xda, 0x1f, 0xf3, 0xa5, 0x24, 0x46, 0xab, 0x82, 0x84, 0xc4, 0x0b, 0x00, 0xb9, 0xe4, + 0x24, 0x28, 0x5f, 0x92, 0xe8, 0x38, 0x67, 0x6e, 0xaf, 0x35, 0x7c, 0xd8, 0xdf, 0xd3, 0xf4, 0xa7, + 0x4b, 0x4e, 0xd7, 0xba, 0x7a, 0x59, 0xdf, 0xfc, 0x78, 0x76, 0xc0, 0x2c, 0x7c, 0xf8, 0x06, 0x5a, + 0x8c, 0xa2, 0x79, 0x45, 0x36, 0x00, 0x7f, 0x55, 0xd8, 0x4c, 0x0f, 0x6c, 0xa6, 0x49, 0x41, 0xf9, + 0xba, 0x24, 0xa9, 0x70, 0xe1, 0x5b, 0x68, 0x1b, 0x06, 0x91, 0xa5, 0x89, 0x20, 0x7c, 0x05, 0x7e, + 0x4e, 0xa2, 0x88, 0x65, 0x45, 0x71, 0xfa, 0x17, 0x05, 0xd3, 0xf5, 0x8a, 0xa8, 0x44, 0x87, 0x1b, + 0x07, 0x1a, 0xba, 0x8c, 0x2f, 0x00, 0x85, 0x8c, 0x72, 0x79, 0xa3, 0x85, 0xca, 0x88, 0x67, 0x37, + 0x5c, 0xb1, 0x39, 0x3d, 0x97, 0x05, 0xba, 0x32, 0xad, 0x0a, 0x63, 0x81, 0x3d, 0x08, 0x28, 0x99, + 0xff, 0x89, 0xad, 0x69, 0xec, 0x11, 0x25, 0x73, 0x1b, 0xf9, 0x1a, 0x0e, 0x79, 0x24, 0x6f, 0x17, + 0x94, 0x8b, 0x8e, 0xab, 0xb5, 0x75, 0x6c, 0x6d, 0x57, 0xd1, 0x8c, 0xe2, 0xb1, 0x01, 0x94, 0xe2, + 0x76, 0x78, 0x7c, 0x0e, 0x8d, 0xc5, 0x32, 0x91, 0xa2, 0x53, 0x3f, 0x73, 0x7a, 0xad, 0xe1, 0x89, + 0xdd, 0xa8, 0xfc, 0xbf, 0x53, 0x45, 0x66, 0x30, 0xe1, 0x7b, 0x68, 0x59, 0x46, 0xff, 0x73, 0x45, + 0x17, 0xe0, 0x5d, 0x47, 0x3c, 0x8b, 0x09, 0x8f, 0xa1, 0xf1, 0x35, 0x8a, 0x0b, 0xd2, 0x4f, 0xe1, + 0x30, 0x13, 0xe0, 0x13, 0x68, 0xee, 0xbc, 0x97, 0xc6, 0xf7, 0x89, 0x70, 0x05, 0xb0, 0x67, 0xc7, + 0x73, 0xf0, 0x62, 0xe5, 0xf2, 0xde, 0xf5, 0x6a, 0xff, 0xa5, 0x80, 0x12, 0x86, 0x43, 0xf0, 0x85, + 0x1e, 0xae, 0xde, 0x54, 0x75, 0xa0, 0xdd, 0x61, 0x74, 0x55, 0x8b, 0x2c, 0x81, 0xe1, 0x00, 0x1a, + 0x9a, 0x0a, 0x11, 0xea, 0x49, 0xc4, 0x8d, 0xdc, 0x26, 0xd3, 0xdf, 0x7b, 0x0f, 0x35, 0x9d, 0x34, + 0x41, 0xf8, 0xcd, 0x81, 0xb6, 0xfd, 0xfc, 0x38, 0x80, 0xba, 0x5c, 0x67, 0xa6, 0xf5, 0x68, 0xf8, + 0xf4, 0x5f, 0x6b, 0xea, 0x4f, 0xd7, 0x19, 0x31, 0x0d, 0xdd, 0x4d, 0xab, 0xdd, 0x37, 0xcd, 0xb5, + 0xa7, 0xf5, 0xa0, 0xae, 0xfa, 0xd0, 0x83, 0xda, 0x68, 0x12, 0x1c, 0xa0, 0x0f, 0xee, 0x87, 0xd1, + 0x24, 0x70, 0x54, 0x82, 0x8d, 0x82, 0x9a, 0x4e, 0xb0, 0x51, 0xe0, 0x86, 0x5f, 0xa0, 0xb9, 0x5b, + 0x2e, 0x9e, 0x82, 0x2f, 0x24, 0x59, 0xb7, 0xe8, 0xa9, 0x70, 0x2c, 0xd4, 0xe4, 0xcf, 0x45, 0x72, + 0x5b, 0x4d, 0x56, 0xdf, 0xf8, 0x08, 0x0e, 0xcd, 0x0d, 0x73, 0xa1, 0x87, 0xbb, 0xcc, 0xd7, 0xf1, + 0x58, 0xe0, 0x09, 0x78, 0xea, 0x60, 0xb9, 0xb9, 0x25, 0x97, 0x35, 0x28, 0x99, 0x8f, 0xc5, 0x65, + 0x67, 0xf3, 0xab, 0x7b, 0xb0, 0xd9, 0x76, 0x9d, 0xef, 0xdb, 0xae, 0xf3, 0x73, 0xdb, 0x75, 0x3e, + 0x79, 0xca, 0x75, 0x36, 0x9b, 0x79, 0xfa, 0xcf, 0x7f, 0xf9, 0x3b, 0x00, 0x00, 0xff, 0xff, 0xe5, + 0xcf, 0xa9, 0xcb, 0x2b, 0x04, 0x00, 0x00, } diff --git a/pkg/store/prompb/remote.proto b/pkg/store/prompb/remote.proto index ec8f3253982..2f7cf3fc0e7 100644 --- a/pkg/store/prompb/remote.proto +++ b/pkg/store/prompb/remote.proto @@ -28,6 +28,10 @@ option (gogoproto.goproto_getters_all) = false; option go_package = "prompb"; +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; +} + message ReadRequest { repeated Query queries = 1 [(gogoproto.nullable) = false]; } @@ -41,6 +45,7 @@ message Query { int64 start_timestamp_ms = 1; int64 end_timestamp_ms = 2; repeated LabelMatcher matchers = 3 [(gogoproto.nullable) = false]; + prometheus.ReadHints hints = 4; } message QueryResult { @@ -73,4 +78,11 @@ message LabelMatcher { Type type = 1; string name = 2; string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. } \ No newline at end of file diff --git a/scripts/prometheus.yml b/scripts/prometheus.yml new file mode 100644 index 00000000000..932a298b284 --- /dev/null +++ b/scripts/prometheus.yml @@ -0,0 +1,10 @@ +# When the Thanos remote-write-receive component is started, +# this is an example configuration of a Prometheus server that +# would scrape a local node-exporter and replicate its data to +# the remote write endpoint. +scrape_configs: + - job_name: 'node' + static_configs: + - targets: ['localhost:9100'] +remote_write: +- url: "http://localhost:19291/receive" diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 29d6a680593..f53f7e3816c 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -89,6 +89,12 @@ done sleep 0.5 +OBJSTORECFG="" +if [ -n "${MINIO_ENABLED}" ] +then +OBJSTORECFG="--objstore.config-file data/bucket.yml" +fi + # Start one sidecar for each Prometheus server. for i in `seq 1 3` do @@ -98,7 +104,7 @@ do --http-address 0.0.0.0:1919${i} \ --prometheus.url http://localhost:909${i} \ --tsdb.path data/prom${i} \ - --objstore.config-file data/bucket.yml \ + ${OBJSTORECFG} \ --cluster.address 0.0.0.0:1939${i} \ --cluster.advertise-address 127.0.0.1:1939${i} \ --cluster.peers 127.0.0.1:19391 & @@ -116,7 +122,7 @@ then --grpc-address 0.0.0.0:19691 \ --http-address 0.0.0.0:19791 \ --data-dir data/store \ - --objstore.config-file data/bucket.yml \ + ${OBJSTORECFG} \ --cluster.address 0.0.0.0:19891 \ --cluster.advertise-address 127.0.0.1:19891 \ --cluster.peers 127.0.0.1:19391 & @@ -136,4 +142,22 @@ do --cluster.peers 127.0.0.1:19391 & done +sleep 0.5 + +if [ -n "${REMOTE_WRITE_ENABLED}" ] +then + ./thanos remote-write-receive \ + --debug.name remote-write-receive \ + --log.level debug \ + --tsdb.path "./data/remote-write-receive-data" \ + --grpc-address 0.0.0.0:19891 \ + --http-address 0.0.0.0:19691 \ + --remote-write.address 0.0.0.0:19291 \ + --cluster.address 0.0.0.0:18291 \ + --cluster.advertise-address 127.0.0.1:18291 \ + --cluster.peers 127.0.0.1:19391 & + + ./prometheus --config.file scripts/prometheus.yml --storage.tsdb.path "data/local-prometheus-data/" & +fi + wait