Skip to content

Commit

Permalink
[query] Add ability to configure reflection for query GRPC server (#1856
Browse files Browse the repository at this point in the history
)
  • Loading branch information
robskillington authored Sep 11, 2019
1 parent fea1d79 commit bbdac9e
Show file tree
Hide file tree
Showing 7 changed files with 514 additions and 147 deletions.
4 changes: 4 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ type RPCConfiguration struct {
// NB: this is deprecated in favor of using RemoteZones, as setting
// RemoteListenAddresses will only allow for a single remote zone to be used.
RemoteListenAddresses []string `yaml:"remoteListenAddresses"`

// ReflectionEnabled will enable reflection on the GRPC server, useful
// for testing connectivity with grpcurl, etc.
ReflectionEnabled bool `yaml:"reflectionEnabled"`
}

// TagOptionsConfiguration is the configuration for shared tag options
Expand Down
508 changes: 398 additions & 110 deletions src/query/generated/proto/rpcpb/query.pb.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion src/query/generated/proto/rpcpb/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@ option go_package = "rpcpb";
import "github.com/m3db/m3/src/metrics/generated/proto/policypb/policy.proto";

service Query {
rpc Health(HealthRequest) returns (HealthResponse);
rpc Fetch(FetchRequest) returns (stream FetchResponse);

rpc Search(SearchRequest) returns (stream SearchResponse);
rpc CompleteTags(CompleteTagsRequest) returns (stream CompleteTagsResponse);
}

message HealthRequest {
}

message HealthResponse {
string uptimeDuration = 1;
int64 uptimeNanoseconds = 2;
}

message FetchRequest {
int64 start = 1;
int64 end = 2;
Expand Down
9 changes: 9 additions & 0 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/uber-go/tally"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const (
Expand Down Expand Up @@ -825,9 +826,17 @@ func startGRPCServer(
instrumentOpts instrument.Options,
) (*grpc.Server, error) {
logger := instrumentOpts.Logger()

logger.Info("creating gRPC server")
server := tsdbRemote.NewGRPCServer(storage,
queryContextOptions, poolWrapper, instrumentOpts)
if cfg.ReflectionEnabled {
reflection.Register(server)
}

logger.Info("gRPC server reflection configured",
zap.Bool("enabled", cfg.ReflectionEnabled))

listener, err := net.Listen("tcp", cfg.ListenAddress)
if err != nil {
return nil, err
Expand Down
91 changes: 55 additions & 36 deletions src/query/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package server

import (
"context"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -202,41 +203,6 @@ func waitForServerHealthy(t *testing.T, addr string) {
maxWait.String())
}

type queryServer struct {
reads, searches, tagCompletes int
mu sync.Mutex
}

func (s *queryServer) Fetch(
*rpc.FetchRequest,
rpc.Query_FetchServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.reads++
return nil
}

func (s *queryServer) Search(
*rpc.SearchRequest,
rpc.Query_SearchServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.searches++
return nil
}

func (s *queryServer) CompleteTags(
*rpc.CompleteTagsRequest,
rpc.Query_CompleteTagsServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.tagCompletes++
return nil
}

func TestGRPCBackend(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand Down Expand Up @@ -286,7 +252,7 @@ writeWorkerPoolPolicy:

s := grpc.NewServer()
defer s.GracefulStop()
qs := &queryServer{}
qs := newQueryServer()
rpc.RegisterQueryServer(s, qs)
go func() {
s.Serve(lis)
Expand Down Expand Up @@ -389,3 +355,56 @@ func TestNewPerQueryEnforcer(t *testing.T) {
floatsEqual(float64(r.Cost), 11)
require.NoError(t, r.Error)
}

var _ rpc.QueryServer = &queryServer{}

type queryServer struct {
up time.Time
reads, searches, tagCompletes int
mu sync.Mutex
}

func newQueryServer() *queryServer {
return &queryServer{up: time.Now()}
}

func (s *queryServer) Health(
ctx context.Context,
req *rpc.HealthRequest,
) (*rpc.HealthResponse, error) {
up := time.Since(s.up)
return &rpc.HealthResponse{
UptimeDuration: up.String(),
UptimeNanoseconds: int64(up),
}, nil
}

func (s *queryServer) Fetch(
*rpc.FetchRequest,
rpc.Query_FetchServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.reads++
return nil
}

func (s *queryServer) Search(
*rpc.SearchRequest,
rpc.Query_SearchServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.searches++
return nil
}

func (s *queryServer) CompleteTags(
*rpc.CompleteTagsRequest,
rpc.Query_CompleteTagsServer,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.tagCompletes++
return nil
}
14 changes: 14 additions & 0 deletions src/query/tsdb/remote/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package remote

import (
"context"
"sync"
"time"

Expand All @@ -42,6 +43,7 @@ const defaultBatch = 128

// TODO: add metrics
type grpcServer struct {
createAt time.Time
poolErr error
batchSize int
storage m3.Storage
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewGRPCServer(
) *grpc.Server {
server := grpc.NewServer()
grpcServer := &grpcServer{
createAt: time.Now(),
storage: store,
queryContextOpts: queryContextOpts,
poolWrapper: poolWrapper,
Expand All @@ -79,6 +82,17 @@ func NewGRPCServer(
return server
}

func (s *grpcServer) Health(
ctx context.Context,
req *rpc.HealthRequest,
) (*rpc.HealthResponse, error) {
uptime := time.Since(s.createAt)
return &rpc.HealthResponse{
UptimeDuration: uptime.String(),
UptimeNanoseconds: int64(uptime),
}, nil
}

func (s *grpcServer) waitForPools() (encoding.IteratorPools, error) {
s.once.Do(func() {
s.pools, s.poolErr = s.poolWrapper.WaitForIteratorPools(poolTimeout)
Expand Down
25 changes: 25 additions & 0 deletions src/query/tsdb/remote/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/query/block"
m3err "github.com/m3db/m3/src/query/errors"
rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/pools"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -179,6 +180,30 @@ func TestRpc(t *testing.T) {
checkFetch(ctx, t, client, read, readOpts)
}

func TestRpcHealth(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx, _, _ := createCtxReadOpts(t)
store := newMockStorage(t, ctrl, mockStorageOptions{})
listener := startServer(t, ctrl, store)
serverClient := buildClient(t, []string{listener.Addr().String()})
defer func() {
assert.NoError(t, serverClient.Close())
}()

client, ok := serverClient.(*grpcClient)
require.True(t, ok)

resp, err := client.client.Health(ctx, &rpc.HealthRequest{})
require.NoError(t, err)

uptime, err := time.ParseDuration(resp.UptimeDuration)
require.NoError(t, err)
assert.True(t, uptime > 0)
assert.Equal(t, uptime, time.Duration(resp.UptimeNanoseconds))
}

func TestRpcMultipleRead(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit bbdac9e

Please sign in to comment.