-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathexchange_sortgather_test.go
87 lines (72 loc) · 3.13 KB
/
exchange_sortgather_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package ep_test
import (
"fmt"
"github.com/panoplyio/ep"
"github.com/panoplyio/ep/eptest"
"github.com/stretchr/testify/require"
"testing"
)
func TestSortGather(t *testing.T) {
runSortGather := func(t *testing.T, sortingCols []ep.SortingCol, expected string, datasets ...ep.Dataset) {
runner := ep.Pipeline(ep.Scatter(), &nodeAddr{}, &localSort{SortingCols: sortingCols}, ep.SortGather(sortingCols))
data, err := eptest.RunDist(t, 2, runner, datasets...)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, 2, data.Width())
require.Equal(t, 9, data.Len())
require.Equal(t, expected, fmt.Sprintf("%v", data))
}
t.Run("no data", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: false}}
runner := ep.Pipeline(ep.Scatter(), &nodeAddr{}, ep.SortGather(sortingCols))
data, err := eptest.RunDist(t, 4, runner)
require.NoError(t, err)
require.Nil(t, data)
})
t.Run("asc", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: false}}
data1 := ep.NewDataset(strs{"hello", "world", "what"})
data2 := ep.NewDataset(strs{"bar", "foo", "j", "yes"})
data3 := ep.NewDataset(strs{"yes", "z"})
expected := "[[bar foo hello j what world yes yes z] [:5552 :5552 :5552 :5551 :5551 :5552 :5551 :5552 :5551]]"
runSortGather(t, sortingCols, expected, data1, data2, data3)
})
t.Run("desc", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: true}}
data1 := ep.NewDataset(strs{"z", "yes"})
data2 := ep.NewDataset(strs{"yes", "j", "foo", "bar"})
data3 := ep.NewDataset(strs{"what", "world", "hello"})
expected := "[[z yes yes world what j hello foo bar] [:5552 :5552 :5551 :5552 :5552 :5552 :5551 :5551 :5551]]"
runSortGather(t, sortingCols, expected, data1, data2, data3)
})
t.Run("multiple columns", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: false}, {Index: 1, Desc: true}}
data1 := ep.NewDataset(strs{"foo", "world", "what"})
data2 := ep.NewDataset(strs{"bar", "foo", "j", "yes"})
data3 := ep.NewDataset(strs{"yes", "z"})
expected := "[[bar foo foo j what world yes yes z] [:5552 :5552 :5552 :5551 :5551 :5552 :5552 :5551 :5551]]"
runSortGather(t, sortingCols, expected, data1, data2, data3)
})
}
func TestSortGather_error(t *testing.T) {
runSortGather := func(t *testing.T, sortingCols []ep.SortingCol, datasets ...ep.Dataset) {
var runner ep.Runner = &dataRunner{ThrowOnData: "failed"}
runner = ep.Pipeline(runner, ep.Scatter(), &nodeAddr{}, ep.SortGather(sortingCols))
_, err := eptest.RunDist(t, 2, runner, datasets...)
require.Error(t, err)
require.Equal(t, "error failed", err.Error())
}
t.Run("error from distributer node", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: true}}
data2 := ep.NewDataset(strs{"failed"})
data1 := ep.NewDataset(strs{"z", "yes"})
runSortGather(t, sortingCols, data1, data2)
})
t.Run("error from peer", func(t *testing.T) {
sortingCols := []ep.SortingCol{{Index: 0, Desc: true}}
data1 := ep.NewDataset(strs{"z", "yes"})
data2 := ep.NewDataset(strs{"failed"})
data3 := ep.NewDataset(strs{"foo"})
runSortGather(t, sortingCols, data1, data2, data3)
})
}