-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathexchange_scatter.go
55 lines (48 loc) · 1.33 KB
/
exchange_scatter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package ep
import (
"github.com/satori/go.uuid"
"io"
)
// Scatter returns an exchange Runner that scatters its input uniformly to
// all other nodes such that the received datasets are dispatched in a round-
// robin to the nodes
func Scatter() Runner {
uid, _ := uuid.NewV4()
return &exchange{UID: uid.String(), Type: scatter}
}
func (ex *exchange) encodeScatter(data Dataset) error {
amountOfPeers := len(ex.encs)
dataLen := data.Len()
peersWithLargerBatch := dataLen % amountOfPeers
batchSize := dataLen / amountOfPeers
start := 0
var err error
// send remainder batch size, i.e. send the peersWithLargerBatch (modulo) data size
for i := 0; i < peersWithLargerBatch; i++ {
end := start + batchSize + 1
err = ex.encodeNext(data.Slice(start, end))
if err != nil {
return err
}
start = end
}
// send data batch size
for i := peersWithLargerBatch; i < amountOfPeers && start < dataLen; i++ {
end := start + batchSize
err = ex.encodeNext(data.Slice(start, end))
if err != nil {
return err
}
start = end
}
return err
}
// encodeNext encodes an object to the next destination connection in a round robin
func (ex *exchange) encodeNext(e interface{}) error {
if len(ex.encs) == 0 {
return io.ErrClosedPipe
}
req := &req{e}
ex.encsNext = (ex.encsNext + 1) % len(ex.encs)
return ex.encs[ex.encsNext].Encode(req)
}