Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor grpc server creation in own package #1487

Merged
merged 9 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions cmd/flags/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,22 @@ type Service struct {
MetricsFactory metrics.Factory

signalsChannel chan os.Signal

hcStatusChannel chan healthcheck.Status
}

// NewService creates a new Service.
func NewService(adminPort int) *Service {
signalsChannel := make(chan os.Signal)
hcStatusChannel := make(chan healthcheck.Status)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))

return &Service{
Admin: NewAdminServer(adminPort),
signalsChannel: signalsChannel,
Admin: NewAdminServer(adminPort),
signalsChannel: signalsChannel,
hcStatusChannel: hcStatusChannel,
}
}

Expand All @@ -76,6 +80,11 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) {
s.Admin.AddFlags(flagSet)
}

// SetHealthCheckStatus sets status of healthcheck
func (s *Service) SetHealthCheckStatus(status healthcheck.Status) {
s.hcStatusChannel <- healthcheck.Unavailable
}

// Start bootstraps the service and starts the admin server.
func (s *Service) Start(v *viper.Viper) error {
if err := TryLoadConfigFile(v); err != nil {
Expand Down Expand Up @@ -120,7 +129,16 @@ func (s *Service) HC() *healthcheck.HealthCheck {
// If then runs the shutdown function and exits.
func (s *Service) RunAndThen(shutdown func()) {
s.HC().Ready()
<-s.signalsChannel

statusLoop:
for {
select {
case status := <-s.hcStatusChannel:
s.HC().Set(status)
case <-s.signalsChannel:
break statusLoop
}
}

yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
s.Logger.Info("Shutting down")
s.HC().Set(healthcheck.Unavailable)
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ const maxSpanCountInChunk = 10

// GRPCHandler implements the GRPC endpoint of the query service.
type GRPCHandler struct {
queryService querysvc.QueryService
queryService *querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
func NewGRPCHandler(queryService *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) {
lis, _ := net.Listen("tcp", grpcServerPort)
grpcServer := grpc.NewServer()
grpcHandler := NewGRPCHandler(*q, logger, tracer)
grpcHandler := NewGRPCHandler(q, logger, tracer)
api_v2.RegisterQueryServiceServer(grpcServer, grpcHandler)

go func() {
Expand Down
140 changes: 140 additions & 0 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"fmt"
"net"
"net/http"

"github.com/gorilla/handlers"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// Server runs HTTP, Mux and a grpc server
type Server struct {
svc *flags.Service
querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer opentracing.Tracer // TODO make part of flags.Service

conn net.Listener
grpcServer *grpc.Server
httpServer *http.Server
}

// NewServer creates and initializes Server
func NewServer(svc *flags.Service, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) *Server {
return &Server{
svc: svc,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: createGRPCServer(querySvc, svc.Logger, tracer),
httpServer: createHTTPServer(querySvc, options, tracer, svc.Logger),
}
}

func createGRPCServer(querySvc *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *grpc.Server {
srv := grpc.NewServer()
handler := NewGRPCHandler(querySvc, logger, tracer)
api_v2.RegisterQueryServiceServer(srv, handler)
return srv
}

func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) *http.Server {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
}
apiHandler := NewAPIHandler(
querySvc,
apiHandlerOptions...)
r := NewRouter()
if queryOpts.BasePath != "/" {
r = r.PathPrefix(queryOpts.BasePath).Subrouter()
}

apiHandler.RegisterRoutes(r)
RegisterStaticHandler(r, logger, queryOpts)
compressHandler := handlers.CompressHandler(r)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

return &http.Server{
Handler: recoveryHandler(compressHandler),
}
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", s.queryOptions.Port))
if err != nil {
return err
}
s.conn = conn

// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
httpListener := cmuxServer.Match(cmux.Any())

go func() {
s.svc.Logger.Info("Starting HTTP server", zap.Int("port", s.queryOptions.Port))
if err := s.httpServer.Serve(httpListener); err != nil {
s.svc.Logger.Error("Could not start HTTP server", zap.Error(err))
}
s.svc.SetHealthCheckStatus(healthcheck.Unavailable)
}()

// Start GRPC server concurrently
go func() {
s.svc.Logger.Info("Starting GRPC server", zap.Int("port", s.queryOptions.Port))
if err := s.grpcServer.Serve(grpcListener); err != nil {
s.svc.Logger.Error("Could not start GRPC server", zap.Error(err))
}
s.svc.SetHealthCheckStatus(healthcheck.Unavailable)
}()

// Start cmux server concurrently.
go func() {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
s.svc.Logger.Info("Starting CMUX server", zap.Int("port", s.queryOptions.Port))
if err := cmuxServer.Serve(); err != nil {
s.svc.Logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.svc.SetHealthCheckStatus(healthcheck.Unavailable)
}()

yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() {
s.grpcServer.Stop()
s.httpServer.Close()
s.conn.Close()
}
61 changes: 61 additions & 0 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"
"time"

"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/ports"
)

func TestServerError(t *testing.T) {
srv := &Server{
queryOptions: &QueryOptions{
Port: -1,
},
}
assert.Error(t, srv.Start())
}

func TestServer(t *testing.T) {
flagsSvc := flags.NewService(ports.AgentAdminHTTP)
flagsSvc.Logger = zap.NewNop()

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP}, tracer)
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
time.Sleep(1 * time.Second)

server.Close()
for i := 0; i < 10; i++ {
if server.svc.HC().Get() == healthcheck.Unavailable {
break
}
time.Sleep(1 * time.Millisecond)
}
assert.Equal(t, healthcheck.Unavailable, server.svc.HC().Get())
}
82 changes: 6 additions & 76 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@ package main
import (
"fmt"
"log"
"net"
"net/http"
"os"

"github.com/gorilla/handlers"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
jaegerClientZapLog "github.com/uber/jaeger-client-go/log/zap"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
istorage "github.com/jaegertracing/jaeger/storage"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -105,77 +97,15 @@ func main() {
queryServiceOptions)

queryOpts := new(app.QueryOptions).InitFromViper(v)
apiHandlerOptions := []app.HandlerOption{
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer),
}
apiHandler := app.NewAPIHandler(
queryService,
apiHandlerOptions...)
r := app.NewRouter()
if queryOpts.BasePath != "/" {
r = r.PathPrefix(queryOpts.BasePath).Subrouter()
}
apiHandler.RegisterRoutes(r)
app.RegisterStaticHandler(r, logger, queryOpts)

compressHandler := handlers.CompressHandler(r)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

// Create HTTP Server
httpServer := &http.Server{
Handler: recoveryHandler(compressHandler),
}

// Create GRPC Server.
grpcServer := grpc.NewServer()
server := app.NewServer(svc, queryService, queryOpts, tracer)

grpcHandler := app.NewGRPCHandler(*queryService, logger, tracer)
api_v2.RegisterQueryServiceServer(grpcServer, grpcHandler)

// Prepare cmux conn.
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
if err != nil {
logger.Fatal("Could not start listener", zap.Error(err))
if err := server.Start(); err != nil {
logger.Fatal("Could not start servers", zap.Error(err))
}

// Create cmux server.
// cmux will reverse-proxy between HTTP and GRPC backends.
s := cmux.New(conn)

// Add GRPC and HTTP listeners.
grpcL := s.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
httpL := s.Match(cmux.Any())

// Start HTTP server concurrently
go func() {
logger.Info("Starting HTTP server", zap.Int("port", queryOpts.Port))
if err := httpServer.Serve(httpL); err != nil {
logger.Fatal("Could not start HTTP server", zap.Error(err))
}
svc.HC().Set(healthcheck.Unavailable)
}()

// Start GRPC server concurrently
go func() {
logger.Info("Starting GRPC server", zap.Int("port", queryOpts.Port))
if err := grpcServer.Serve(grpcL); err != nil {
logger.Fatal("Could not start GRPC server", zap.Error(err))
}
svc.HC().Set(healthcheck.Unavailable)
}()

// Start cmux server concurrently.
go func() {
if err := s.Serve(); err != nil {
logger.Fatal("Could not start multiplexed server", zap.Error(err))
}
svc.HC().Set(healthcheck.Unavailable)
}()

svc.RunAndThen(nil)
svc.RunAndThen(func() {
server.Close()
})
return nil
},
}
Expand Down