-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdistribute.go
362 lines (307 loc) · 8 KB
/
distribute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package ep
import (
"context"
"encoding/gob"
"fmt"
"io"
"log"
"net"
"sync"
"time"
)
// MagicNumber used by the built-in Distributer to prefix all of its connections
// It can be used for routing connections
var MagicNumber = []byte("EP01")
var _ = registerGob(&distRunner{})
// Distributer is an object that can distribute Runners to run in parallel on
// multiple nodes.
type Distributer interface {
// Distribute a Runner to multiple node addresses
Distribute(runner Runner, addrs ...string) Runner
// Stop listening for incoming Runners to run, and close all open
// connections.
Close() error
}
type dialer interface {
Dial(network, addr string) (net.Conn, error)
}
// NewDistributer creates a Distributer that can be used to distribute work of
// Runners across multiple nodes in a cluster. Distributer must be started on
// all node peers in order for them to receive work. You can also implement the
// dialer interface (implemented by net.Dialer) in order to provide your own
// connections:
//
// type dialer interface {
// Dial(network, addr string) (net.Conn, error)
// }
func NewDistributer(addr string, listener net.Listener) Distributer {
connsMap := make(map[string]chan net.Conn)
closeCh := make(chan error, 1)
d := &distributer{listener, addr, connsMap, &sync.Mutex{}, closeCh}
go d.start()
return d
}
type distributer struct {
listener net.Listener
addr string
connsMap map[string]chan net.Conn
l sync.Locker
closeCh chan error
}
func (d *distributer) start() error {
defer close(d.closeCh)
for {
conn, err := d.listener.Accept()
if err != nil {
return err
}
go d.Serve(conn)
}
}
func (d *distributer) Close() error {
err := d.listener.Close()
// wait for start() above to exit. otherwise, attempts to re-bind to the
// same address will infrequently fail with "bind: address already in use".
// because while the listener is closed, there's still one pending Accept()
// TODO: consider waiting for all served connections/runners?
<-d.closeCh
return err
}
func (d *distributer) Dial(network, addr string) (conn net.Conn, err error) {
if d.closeCh == nil {
return nil, io.ErrClosedPipe
}
dialer, ok := d.listener.(dialer)
if ok {
conn, err = dialer.Dial(network, addr)
} else {
conn, err = net.Dial(network, addr)
}
if err != nil {
return
}
_, err = conn.Write(MagicNumber)
if err != nil {
conn.Close()
}
return
}
func (d *distributer) Distribute(runner Runner, addrs ...string) Runner {
return &distRunner{runner, addrs, d.addr, d}
}
// Connect to a node address for the given uid. Used by the individual exchange
// runners to synchronize a specific logical point in the code. We need to
// ensure that both sides of the connection, when used with the same UID,
// resolve to the same connection
func (d *distributer) Connect(addr string, uid string) (conn net.Conn, err error) {
from := d.addr
if from < addr {
// dial
conn, err = d.Dial("tcp", addr)
if err != nil {
return
}
err = writeStr(conn, "D") // Data connection
if err != nil {
return
}
err = writeStr(conn, d.addr+":"+uid)
if err != nil {
return
}
} else {
// listen, timeout after 1 second
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case conn = <-d.connCh(addr + ":" + uid):
// let it through
case <-timer.C:
err = fmt.Errorf("ep: connect timeout; no incoming conn")
}
}
return conn, err
}
func (d *distributer) Serve(conn net.Conn) error {
prefix := make([]byte, 4)
_, err := io.ReadFull(conn, prefix)
if err != nil {
return err
}
if string(prefix) != string(MagicNumber) {
return fmt.Errorf("unrecognized connection. Missing MagicNumber prefix")
}
typee, err := readStr(conn)
if err != nil {
return err
}
if typee == "D" { // data connection
key, err := readStr(conn)
if err != nil {
return err
}
// wait for someone to claim it.
d.connCh(key) <- conn
} else if typee == "X" { // execute runner connection
defer conn.Close()
r := &distRunner{d: d}
dec := gob.NewDecoder(conn)
err := dec.Decode(r)
if err != nil {
log.Println("ep: distributer error", err)
return err
}
// drain the output
// generally - if we're always using Gather, the output will be empty
// perhaps we want to log/return an error when some of the data is
// discarded here?
out := make(chan Dataset)
go drain(out)
inp := make(chan Dataset, 1)
close(inp)
Run(context.Background(), r, inp, out, nil, &err)
if err != nil {
err = &errMsg{err.Error()}
}
// report back to master - either local error or nil payload
enc := gob.NewEncoder(conn)
err = enc.Encode(&req{err})
if err != nil {
log.Println("ep: runner error", err)
return err
}
} else {
defer conn.Close()
err := fmt.Errorf("unrecognized connection type: %s", typee)
log.Println("ep: " + err.Error())
return err
}
return nil
}
func (d *distributer) connCh(k string) chan net.Conn {
d.l.Lock()
defer d.l.Unlock()
if d.connsMap[k] == nil {
d.connsMap[k] = make(chan net.Conn)
}
return d.connsMap[k]
}
// distRunner wraps around a runner, and upon the initial call to Run, it
// distributes the runner to all nodes and runs them in parallel.
type distRunner struct {
Runner
Addrs []string // participating node addresses
MasterAddr string // the master node that created the distRunner
d *distributer
}
func (r *distRunner) Equals(other interface{}) bool {
o, ok := other.(*distRunner)
return ok && r.MasterAddr == o.MasterAddr && o.Runner.Equals(r.Runner)
}
func (r *distRunner) Run(ctx context.Context, inp, out chan Dataset) error {
var errs []error
var decs []*gob.Decoder
isMain := r.d.addr == r.MasterAddr
for i := 0; i < len(r.Addrs) && isMain; i++ {
addr := r.Addrs[i]
if addr == r.d.addr {
continue
}
conn, err := r.d.Dial("tcp", addr)
if err != nil {
errs = append(errs, err)
break
}
defer conn.Close()
err = writeStr(conn, "X") // runner connection
if err != nil {
errs = append(errs, err)
break
}
enc := gob.NewEncoder(conn)
err = enc.Encode(r)
if err != nil {
errs = append(errs, err)
break
}
decs = append(decs, gob.NewDecoder(conn))
}
ctx = context.WithValue(ctx, allNodesKey, r.Addrs)
ctx = context.WithValue(ctx, masterNodeKey, r.MasterAddr)
ctx = context.WithValue(ctx, thisNodeKey, r.d.addr)
ctx = context.WithValue(ctx, distributerKey, r.d)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
respErrs := make(chan error, len(decs)+1)
wg := sync.WaitGroup{}
// start running query iff no errors were detected
if len(errs) == 0 {
wg.Add(1)
go func() {
defer wg.Done()
err := r.Runner.Run(ctx, inp, out)
if err != nil {
respErrs <- err
}
}()
}
// collect error responses
// The final error is transmitted by the Distributer at the end of the remote
// Run. We need the top-level runner here to make sure we wait for all runners
// to complete, thus not leaving any open resources/goroutines
// note decs contains only peers that successfully got distRunner
for _, dec := range decs {
wg.Add(1)
go func(decoder *gob.Decoder) {
defer wg.Done()
req := &req{}
err := decoder.Decode(req)
data := req.Payload
if err == nil {
err, _ = data.(error)
}
if err != nil && err.Error() != io.EOF.Error() {
cancel()
if err.Error() != errOnPeer.Error() {
respErrs <- err
}
}
}(dec)
}
go func() {
wg.Wait()
close(respErrs)
}()
var finalError error
for _, e := range errs {
if e != errOnPeer {
finalError = e
break
}
}
// wait for respErrs channel anyway, and select first meaningful error
for e := range respErrs {
if finalError == nil {
finalError = e
}
}
return finalError
}
// write a null-terminated string to a writer
func writeStr(w io.Writer, s string) error {
_, err := w.Write(append([]byte(s), 0))
return err
}
// read a null-terminated string from a reader
func readStr(r io.Reader) (s string, err error) {
b := []byte{0}
for {
_, err = r.Read(b)
if err != nil {
return
} else if b[0] == 0 {
return
}
s += string(b[0])
}
}