-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdistribute_test.go
72 lines (56 loc) · 1.95 KB
/
distribute_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package ep_test
import (
"fmt"
"github.com/panoplyio/ep"
"github.com/panoplyio/ep/eptest"
"github.com/stretchr/testify/require"
"testing"
)
func TestDistributer(t *testing.T) {
runner := ep.Pipeline(ep.Scatter(), ep.Gather())
data1 := ep.NewDataset(strs{"hello", "world"})
data2 := ep.NewDataset(strs{"foo", "bar"})
data, err := eptest.RunDist(t, 3, runner, data1, data2)
require.NoError(t, err)
require.Equal(t, 1, data.Width())
require.Equal(t, "[hello world foo bar]", fmt.Sprintf("%v", data.At(0)))
}
func TestDistributer_connectionError(t *testing.T) {
port := ":5551"
dist := eptest.NewPeer(t, port)
defer eptest.ClosePeer(t, dist)
runner := dist.Distribute(&upper{}, port, ":5000")
data, err := eptest.Run(runner, ep.NewDataset())
require.Error(t, err)
require.Equal(t, "dial tcp :5000: connect: connection refused", err.Error())
require.Nil(t, data)
}
// Test that errors are transmitted across the network
func TestDistributer_Distribute_errorFromPeer(t *testing.T) {
mightErrored := &dataRunner{ThrowOnData: ":5552"}
runner := ep.Pipeline(ep.Scatter(), &nodeAddr{}, mightErrored)
data1 := ep.NewDataset(strs{"hello", "world"})
data2 := ep.NewDataset(strs{"foo", "bar"})
data, err := eptest.RunDist(t, 4, runner, data1, data2)
require.Error(t, err)
require.Equal(t, "error :5552", err.Error())
require.Equal(t, 2, data.Width())
}
func TestDistributer_Distribute_ignoreErrIgnorable(t *testing.T) {
var tests = []struct {
name string
r ep.Runner
}{
{name: "from peer", r: &dataRunner{ThrowOnData: ":5552", ThrowIgnorable: true}},
{name: "from master", r: &dataRunner{ThrowOnData: ":5551", ThrowIgnorable: true}},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
runner := ep.Pipeline(ep.Scatter(), &nodeAddr{}, tc.r)
data1 := ep.NewDataset(strs{"hello", "world"})
data2 := ep.NewDataset(strs{"foo", "bar"})
_, err := eptest.RunDist(t, 4, runner, data1, data2)
require.NoError(t, err)
})
}
}