-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiscover.go
168 lines (133 loc) · 5.44 KB
/
discover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"strings"
"github.com/gogo/protobuf/proto"
descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pbreflect "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
func dialOptions(endpoint string) (opts []grpc.DialOption) {
if strings.Contains(endpoint, "*") {
zlog.Info("with transport credentials")
creds := credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
zlog.Info("insecure endpoint")
opts = append(opts, grpc.WithInsecure())
}
var maxCallRecvMsgSize = 1024 * 1024 * 100
var defaultCallOptions = []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize), grpc.WaitForReady(true)}
opts = append(opts, grpc.WithDefaultCallOptions(defaultCallOptions...))
return opts
}
func discover(services []string, conf *config) error {
filesRequested := map[string]bool{}
for _, srv := range services {
srv = strings.TrimSpace(srv)
target := strings.Replace(srv, "*", "", -1)
zl := zlog.With(zap.String("service", srv))
zl.Info("querying service")
// CALL the reflection API there
opts := dialOptions(srv)
conn, err := grpc.Dial(target, opts...)
errorCheck(zl, "dialing to service error", err)
client := pbreflect.NewServerReflectionClient(conn)
stream, err := client.ServerReflectionInfo(context.Background())
errorCheck(zl, "setting up client error", err)
err = stream.Send(&pbreflect.ServerReflectionRequest{
Host: target,
MessageRequest: &pbreflect.ServerReflectionRequest_ListServices{ListServices: "*"},
})
errorCheck(zl, "sending list services request error", err)
resp, err := stream.Recv()
errorCheck(zl, "receiving list services response error", err)
zl.Info("reflection list services response", zap.Any("response", toMap(resp)))
switch msg := resp.MessageResponse.(type) {
case *pbreflect.ServerReflectionResponse_ListServicesResponse:
var reqs int
for _, serviceResp := range msg.ListServicesResponse.Service {
serviceName := serviceResp.Name
conf.allServices = append(conf.allServices, serviceName)
conf.serviceToEndpoint[serviceName] = srv
err = stream.Send(&pbreflect.ServerReflectionRequest{
Host: target,
MessageRequest: &pbreflect.ServerReflectionRequest_FileContainingSymbol{FileContainingSymbol: serviceName},
})
errorCheck(zl, "sending reflection request error", err)
err = stream.Send(&pbreflect.ServerReflectionRequest{
Host: target,
MessageRequest: &pbreflect.ServerReflectionRequest_AllExtensionNumbersOfType{AllExtensionNumbersOfType: serviceName},
})
errorCheck(zl, "sending reflection request error", err)
reqs += 2
}
for i := 0; i < reqs; i++ {
resp, err := stream.Recv()
errorCheck(zl, "receiving reflection response error", err)
origReq := resp.OriginalRequest
zl.Info("reflection request response", zap.Any("request", toMap(origReq)), zap.Any("resonse", toMap(resp)))
switch msg := resp.MessageResponse.(type) {
case *pbreflect.ServerReflectionResponse_AllExtensionNumbersResponse:
r := msg.AllExtensionNumbersResponse
origSymbol := origReq.MessageRequest.(*pbreflect.ServerReflectionRequest_AllExtensionNumbersOfType).AllExtensionNumbersOfType
zl.Info("all extensions number",
zap.String("base_type", r.BaseTypeName),
zap.Int32s("extension_number", r.ExtensionNumber),
zap.String("original_symbol", origSymbol),
)
conf.extensionNumbers[origSymbol] = resp
case *pbreflect.ServerReflectionResponse_FileDescriptorResponse:
r := msg.FileDescriptorResponse
var filenames []string
for _, descFile := range r.FileDescriptorProto {
desc := &descriptor.FileDescriptorProto{}
err = proto.Unmarshal(descFile, desc)
errorCheck(zl, "unmarshal file descriptor proto error", err)
filenames = append(filenames, desc.Dependency...)
}
for _, fileName := range filenames {
if !filesRequested[fileName] {
err = stream.Send(&pbreflect.ServerReflectionRequest{
Host: target,
MessageRequest: &pbreflect.ServerReflectionRequest_FileByFilename{FileByFilename: fileName},
})
reqs++
filesRequested[fileName] = true
}
}
switch origPayload := origReq.MessageRequest.(type) {
case *pbreflect.ServerReflectionRequest_FileContainingSymbol:
origSymbol := origPayload.FileContainingSymbol
conf.fileContainingSymbol[origSymbol] = resp
case *pbreflect.ServerReflectionRequest_FileByFilename:
origFilename := origPayload.FileByFilename
conf.filesByFilename[origFilename] = resp
}
case *pbreflect.ServerReflectionResponse_ErrorResponse:
zl.Warn("received reflection error response", zap.Any("response", toMap(msg.ErrorResponse)))
default:
zl.Warn("an unpextec response type was received but not handled")
}
}
default:
errorCheck(zl, "wuut, invalid response to the request we made error", fmt.Errorf("we received type %T %+v", msg, msg))
}
errorCheck(zl, "close send error", stream.CloseSend())
}
return nil
}
func toMap(any interface{}) map[string]interface{} {
cnt, err := json.Marshal(any)
errorCheck(zlog, "marshal response", err)
out := map[string]interface{}{}
errorCheck(zlog, "unmarshal response", json.Unmarshal(cnt, &out))
return out
}