Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gRPC for query service #1307

Merged
merged 33 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ab4d728
Initial commit, grpc for query-service
Jan 29, 2019
650fc55
Re-evaluate fields in GRPCHandler
Jan 29, 2019
30033df
Use cmux, use stream in GetTraceResponse, regenerate *pb.go and swagg…
Feb 26, 2019
6467d44
Edit GetTrace HTTP verb
Feb 28, 2019
5a65f9f
Re-add query definitions in a new file
Mar 27, 2019
9dcfe08
Address comments, correct API endpoint for query grpc
Mar 29, 2019
114ddba
Add definitions for other query functions
Mar 31, 2019
c62ce29
Address comments, make API as close to plugin proto
Apr 2, 2019
39dc5d4
Fix api definitions
Apr 3, 2019
20f028e
Add dependency query utility
Apr 3, 2019
14645f9
Make fmt, add findtraces implementation
Apr 3, 2019
3bb4cb1
Correct rebase
Apr 3, 2019
2854624
Merge branch 'master' into query-service-grpc
Apr 4, 2019
b4f58cd
Use end_time instead of duration
Apr 4, 2019
472de89
dep ensure, fix build
Apr 4, 2019
e698953
Merge branch 'master' into query-service-grpc
Apr 4, 2019
4a85f6f
Addressed comments, fix proto tool versions
Apr 6, 2019
b03b497
Fix build
Apr 7, 2019
6ea3b82
Fix build
Apr 7, 2019
64b57da
Remove grpc-gateway code generation; use streaming
Apr 8, 2019
8a6f286
Implement streaming
Apr 8, 2019
63d5638
make fmt lint
Apr 8, 2019
8d2535d
Merge branch 'master' into query-service-grpc
yurishkuro Apr 8, 2019
289385b
dep update
Apr 8, 2019
86a771f
Start adding tests
Apr 9, 2019
86d770b
Add tests for GRPC handler
Apr 12, 2019
e7f0e0d
Fix grpc handler tests
Apr 13, 2019
612dc13
WIP fixing tests
Apr 14, 2019
9c3f1ee
Fix tests
Apr 15, 2019
c1d9abe
Fix tests
Apr 16, 2019
5a32ba0
Add test
Apr 16, 2019
b61fc18
Merge branch 'master' into query-service-grpc
Apr 16, 2019
8dfc90a
Merge branch 'master' into query-service-grpc
yurishkuro Apr 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ required = [
[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
version = "1.0.0"

[[constraint]]
name = "github.com/soheilhy/cmux"
version = "0.1.4"
147 changes: 147 additions & 0 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"context"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// GRPCHandler implements the GRPC endpoint of the query service.
type GRPCHandler struct {
queryService querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
tracer: tracer,
}

return gH
}

// GetTrace is the GRPC handler to fetch traces based on TraceId.
func (g *GRPCHandler) GetTrace(ctx context.Context, r *api_v2.GetTraceRequest) (*api_v2.GetTraceResponseStream, error) {
ID := r.TraceId

trace, err := g.queryService.GetTrace(ctx, ID)
if err == spanstore.ErrTraceNotFound {
g.logger.Error("trace not found", zap.Error(err))
return nil, err
}
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
return nil, err
}

spans := make([]model.Span, 0, len(trace.Spans))
for _, span := range trace.Spans {
spans.append(*span)
}

return &api_v2.SpansResponseChunk{Spans: spans}, nil
}

// ArchiveTrace is the GRPC handler to archive traces.
func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRequest) (*api_v2.ArchiveTraceResponse, error) {
ID := r.TraceId

err := g.queryService.ArchiveTrace(ctx, ID)
if err == spanstore.ErrTraceNotFound {
g.logger.Error("trace not found", zap.Error(err))
return nil, err
}
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
return nil, err
}

return &api_v2.ArchiveTraceResponse{}, nil
}

// FindTraces is the GRPC handler to fetch traces based on TraceQueryParameters.
func (g *GRPCHandler) FindTraces(ctx context.Context, r *api_v2.FindTracesRequest) (*api_v2.FindTracesResponse, error) {
queryParams := spanstore.TraceQueryParameters{
ServiceName: r.ServiceName,
OperationName: r.OperationName,
Tags: r.Tags,
StartTimeMin: r.StartTimeMin,
StartTimeMax: r.StartTimeMax,
DurationMin: r.DurationMin,
DurationMax: r.DurationMax,
NumTraces: r.NumTraces,
}
traces, err := g.queryService.FindTraces(ctx, &queryParams)
if err != nil {
g.logger.Error("Error fetching traces", zap.Error(err))
return nil, err
}

spans := []model.Span{}
for _, trace := range traces {
for _, span := range trace.Spans {
spans.append(*span)
}
}
return &api_v2.SpansResponseChunk{Spans: spans}, nil
}

// GetServices is the GRPC handler to fetch services.
func (g *GRPCHandler) GetServices(ctx context.Context, r *api_v2.GetServicesRequest) (*api_v2.GetServicesReponse, error) {
services, err := g.queryService.GetServices(ctx)
if err != nil {
g.logger.Error("Error fetching services", zap.Error(err))
return nil, err
}

return &api_v2.GetServicesReponse{Services: services}, nil
}

// GetOperations is the GRPC handler to fetch operations.
func (g *GRPCHandler) GetOperations(ctx context.Context, r *api_v2.GetOperationsRequest) (*api_v2.GetOperationsReponse, error) {
service := r.Service
operations, err := g.queryService.GetOperations(ctx, service)
if err != nil {
g.logger.Error("Error fetching operations", zap.Error(err))
return nil, err
}

return &api_v2.GetOperationsReponse{Operations: operations}, nil
}

// GetDependencies is the GRPC handler to fetch dependencies.
func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) {
startTime := r.StartTime
endTime := r.EndTime
dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime))
if err != nil {
g.logger.Error("Error fetching dependencies", zap.Error(err))
return nil, err
}

return &api_v2.GetDependenciesReponse{Dependencies: dependencies}, nil
}
50 changes: 45 additions & 5 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ package main

import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"strconv"

"github.com/gorilla/handlers"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
jaegerClientZapLog "github.com/uber/jaeger-client-go/log/zap"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
Expand All @@ -40,6 +44,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
istorage "github.com/jaegertracing/jaeger/storage"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -116,18 +121,53 @@ func main() {
apiHandler.RegisterRoutes(r)
app.RegisterStaticHandler(r, logger, queryOpts)

portStr := ":" + strconv.Itoa(queryOpts.Port)
compressHandler := handlers.CompressHandler(r)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

// Create HTTP Server
httpServer := &http.Server{
Handler: recoveryHandler(compressHandler),
}

// Create GRPC Server.
grpcServer := grpc.NewServer()

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

grpcHandler := app.NewGRPCHandler(*queryService, logger, tracer)
api_v2.RegisterQueryServiceHandler(grpcServer, grpcHandler)

// Prepare cmux conn.
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
if err != nil {
logger.Fatal("Could not start listener", zap.Error(err))
}

// Create cmux server.
// cmux will reverse-proxy between HTTP and GRPC backends.
s := cmux.New(conn)

// Add GRPC and HTTP listeners.
grpcL := s.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
httpL := s.Match(cmux.Any())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Booked #1482 to refactor this out of main


go func() {
logger.Info("Starting HTTP server", zap.Int("port", queryOpts.Port))
if err := http.ListenAndServe(portStr, recoveryHandler(compressHandler)); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}
httpServer.Serve(httpL)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
svc.HC().Set(healthcheck.Unavailable)
}()

go func() {
logger.Info("Starting GRPC server", zap.Int("port", queryOpts.Port))
grpcServer.Serve(grpcL)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
svc.HC().Set(healthcheck.Unavailable)
}()

// Start cmux server.
s.Serve()
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

svc.RunAndThen(nil)
return nil
},
Expand Down
Loading