-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_client.go
173 lines (149 loc) · 4.26 KB
/
rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package plugin
import (
"crypto/tls"
"fmt"
"io"
"net"
"net/rpc"
"github.com/hashicorp/yamux"
)
// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
type RPCClient struct {
broker *MuxBroker
control *rpc.Client
plugins map[string]Plugin
// These are the streams used for the various stdout/err overrides
stdout, stderr net.Conn
}
// newRPCClient creates a new RPCClient. The Client argument is expected
// to be successfully started already with a lock held.
func newRPCClient(c *Client) (*RPCClient, error) {
// Connect to the client
conn, err := rpcDial(c)
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}
if c.config.TLSConfig != nil {
conn = tls.Client(conn, c.config.TLSConfig)
}
// Create the actual RPC client
result, err := NewRPCClient(conn, c.config.Plugins)
if err != nil {
conn.Close()
return nil, err
}
// Begin the stream syncing so that stdin, out, err work properly
err = result.SyncStreams(
c.config.SyncStdout,
c.config.SyncStderr)
if err != nil {
result.Close()
return nil, err
}
return result, nil
}
// NewRPCClient creates a client from an already-open connection-like value.
// Dial is typically used instead.
func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
// Create the yamux client so we can multiplex
mux, err := yamux.Client(conn, nil)
if err != nil {
conn.Close()
return nil, err
}
// Connect to the control stream.
control, err := mux.Open()
if err != nil {
mux.Close()
return nil, err
}
// Connect stdout, stderr streams
stdstream := make([]net.Conn, 2)
for i, _ := range stdstream {
stdstream[i], err = mux.Open()
if err != nil {
mux.Close()
return nil, err
}
}
// Create the broker and start it up
broker := newMuxBroker(mux)
go broker.Run()
// Build the client using our broker and control channel.
return &RPCClient{
broker: broker,
control: rpc.NewClient(control),
plugins: plugins,
stdout: stdstream[0],
stderr: stdstream[1],
}, nil
}
// SyncStreams should be called to enable syncing of stdout,
// stderr with the plugin.
//
// This will return immediately and the syncing will continue to happen
// in the background. You do not need to launch this in a goroutine itself.
//
// This should never be called multiple times.
func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
go copyStream("stdout", stdout, c.stdout)
go copyStream("stderr", stderr, c.stderr)
return nil
}
// Close closes the connection. The client is no longer usable after this
// is called.
func (c *RPCClient) Close() error {
// Call the control channel and ask it to gracefully exit. If this
// errors, then we save it so that we always return an error but we
// want to try to close the other channels anyways.
var empty struct{}
returnErr := c.control.Call("Control.Quit", true, &empty)
// Close the other streams we have
if err := c.control.Close(); err != nil {
return err
}
if err := c.stdout.Close(); err != nil {
return err
}
if err := c.stderr.Close(); err != nil {
return err
}
if err := c.broker.Close(); err != nil {
return err
}
// Return back the error we got from Control.Quit. This is very important
// since we MUST return non-nil error if this fails so that Client.Kill
// will properly try a process.Kill.
return returnErr
}
func (c *RPCClient) Dispense(name string) (interface{}, error) {
p, ok := c.plugins[name]
if !ok {
return nil, fmt.Errorf("unknown plugin type: %s", name)
}
var id uint32
if err := c.control.Call(
"Dispenser.Dispense", name, &id); err != nil {
return nil, err
}
conn, err := c.broker.Dial(id)
if err != nil {
return nil, err
}
return p.Client(c.broker, rpc.NewClient(conn))
}
// Ping pings the connection to ensure it is still alive.
//
// The error from the RPC call is returned exactly if you want to inspect
// it for further error analysis. Any error returned from here would indicate
// that the connection to the plugin is not healthy.
func (c *RPCClient) Ping() error {
var empty struct{}
return c.control.Call("Control.Ping", true, &empty)
}