Skip to content

Commit

Permalink
Simplify constructor by merging chan and slice sources
Browse files Browse the repository at this point in the history
  • Loading branch information
DenWav committed Feb 25, 2019
1 parent 4a1f95e commit d030f48
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 41 deletions.
4 changes: 2 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func main() {

seen := make([]int, 0)

streams.NewChanStream(c).
streams.NewStream(c).
WithCancel(cancel).
Filter(func(i int) bool {
return streams.NewSliceStream(seen).None(func(n int) bool {
return streams.NewStream(seen).None(func(n int) bool {
return i%n == 0
})
}).
Expand Down
40 changes: 25 additions & 15 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,37 @@ type Stream struct {
cancel *[]chan<- bool
}

// NewChanStream creates a new Stream object that uses the provided channel as the source. The first argument to this
// function must be a <-chan R where R is some type. The implicit type of the returned Stream will be R.
// NewStream creates a new Stream object that uses the provided channel or slice as the source. The first argument to
// this function must be either a <-chan R, or []R, where R is some type. The implicit type of the returned Stream will
// be R.
//
// The provided channel may be an infinite value generator. In this case, you must make sure to use limiting functions
// like Take or First to prevent the Stream from processing forever and crashing.
// If using a channel, the provided channel may be an infinite value generator. In this case, you must make sure to use
// limiting functions like Take or First to prevent the Stream from processing forever and crashing.
//
// The generic type signature of this function would be:
//
// <T> func NewChanStream(channel <-chan T, channel ...chan<- bool) *Stream<T>
// <S : []T | <-chan T> func NewStream(source S, channel ...chan<- bool) *Stream<T>
//
// Any arguments provided after the stream are channels which should be used to stop any running goroutine which needs
// Which is to say there is some type S which is either a slice of T ([]T) or a receiving channel of T (<-chan T), which
// would make this return a pointer to a Stream of T's (*Stream<T>).
//
// Any arguments provided after the source are channels which should be used to stop any running goroutine which needs
// to be stopped when processing of the Stream completes. A single 'true' value will be sent to each channel given. The
// send operation will not wait or block, so either define each channel as a buffered channel, or make sure you're
// always listening to it.
func NewChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream {
func NewStream(source AnyType, cancel ...chan<- bool) *Stream {
t := reflect.TypeOf(source)
switch t.Kind() {
case reflect.Slice:
return newSliceStream(source, cancel...)
case reflect.Chan:
return newChanStream(source, cancel...)
default:
panic("provided source is not a slice or channel")
}
}

func newChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream {
return &Stream{func() (AnyType, bool) {
item, ok := chanRecv(channel)
if ok {
Expand All @@ -237,13 +253,7 @@ func NewChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream {
}, &cancel}
}

// NewSliceStream creates a new Stream object that uses the provided slice as the source. Teh first argument to this
// function must be a []R where R is some type. The implicit type of the returned Stream will be R.
//
// The generic type signature for this function would be:
//
// <T> func NewSliceStream(slice []T) *Stream<T>
func NewSliceStream(slice AnySlice) *Stream {
func newSliceStream(slice AnySlice, cancel ...chan<- bool) *Stream {
index := 0
return &Stream{func() (AnyType, bool) {
if index < sliceLen(slice) {
Expand All @@ -252,7 +262,7 @@ func NewSliceStream(slice AnySlice) *Stream {
return item, true
}
return nil, false
}, &[]chan<- bool{}}
}, &cancel}
}

func callFunc(f interface{}, args ...reflect.Value) []reflect.Value {
Expand Down
48 changes: 24 additions & 24 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestStream_All(t *testing.T) {

s := []string{"hello", "world"}

res := streams.NewSliceStream(s).
res := streams.NewStream(s).
Filter(func(word string) bool {
return strings.Contains(word, "or")
}).
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestStream(t *testing.T) {
close(c)
}()

res := streams.NewChanStream(c).
res := streams.NewStream(c).
Filter(func(word string) bool {
return strings.Contains(word, "el")
}).
Expand Down Expand Up @@ -87,7 +87,7 @@ func Test(t *testing.T) {
}
}()

sum := streams.NewChanStream(c).
sum := streams.NewStream(c).
WithCancel(cancel).
Map(func(i int) int {
return i * 2
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDistinct(t *testing.T) {
}
}()

sum := streams.NewChanStream(c, cancel).
sum := streams.NewStream(c, cancel).
Map(func(i int) int {
return i * 2
}).
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestSort(t *testing.T) {
}()

var res []int
streams.NewChanStream(c, cancel).
streams.NewStream(c, cancel).
Take(10000).
Sort(func(left, right int) bool {
return left < right
Expand All @@ -165,7 +165,7 @@ func TestAvg(t *testing.T) {
defer goleak.VerifyNoLeaks(t)

data := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
avg := streams.NewSliceStream(data).
avg := streams.NewStream(data).
AvgInt(func(f int64) int64 {
return f
})
Expand All @@ -177,7 +177,7 @@ func TestCount(t *testing.T) {
defer goleak.VerifyNoLeaks(t)

data := []int{0, 1, 2}
count := streams.NewSliceStream(data).
count := streams.NewStream(data).
Filter(func(i int) bool {
return i != 1
}).
Expand All @@ -192,7 +192,7 @@ func TestForEach(t *testing.T) {
data := []int{0, 1, 2}
var output []int

streams.NewSliceStream(data).
streams.NewStream(data).
ForEach(func(i int) {
output = append(output, i)
})
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestToChan(t *testing.T) {
}
}()

streams.NewSliceStream(data).
streams.NewStream(data).
WithCancel(q).
ToChan(c)

Expand All @@ -237,7 +237,7 @@ func TestSkip(t *testing.T) {
expected := []int{2}
var output []int

streams.NewSliceStream(data).
streams.NewStream(data).
Skip(2).
ToSlice(&output)

Expand All @@ -251,7 +251,7 @@ func TestOnEach(t *testing.T) {
var output1 []int
var output2 []int

streams.NewSliceStream(data).
streams.NewStream(data).
OnEach(func(i int) {
output2 = append(output2, i)
}).
Expand All @@ -270,7 +270,7 @@ func TestMin(t *testing.T) {

var val int

streams.NewSliceStream(data).
streams.NewStream(data).
Min(&val, func(left, right int) bool {
return left < right
})
Expand All @@ -285,7 +285,7 @@ func TestMax(t *testing.T) {

var val int

streams.NewSliceStream(data).
streams.NewStream(data).
Max(&val, func(left, right int) bool {
return left < right
})
Expand All @@ -300,7 +300,7 @@ func TestReduce(t *testing.T) {

var res map[rune]int

streams.NewSliceStream([]string{data}).
streams.NewStream([]string{data}).
SliceFlatMap(func(line string) []rune {
return []rune(line)
}).
Expand Down Expand Up @@ -335,12 +335,12 @@ func TestReduceMin(t *testing.T) {
var valMin int
var valReduce int

streams.NewSliceStream(data).
streams.NewStream(data).
Min(&valMin, func(left, right int) bool {
return left < right
})

streams.NewSliceStream(data).
streams.NewStream(data).
Reduce(&valReduce, int(math.MaxInt32), func(item, out int) int {
if item < out {
return item
Expand All @@ -359,9 +359,9 @@ func TestConcat(t *testing.T) {

var out []int

streams.NewSliceStream(data).
Concat(streams.NewSliceStream(data)).
Concat(streams.NewSliceStream(data)).
streams.NewStream(data).
Concat(streams.NewStream(data)).
Concat(streams.NewStream(data)).
ToSlice(&out)

var expected []int
Expand All @@ -379,8 +379,8 @@ func TestZip(t *testing.T) {

var out []int

streams.NewSliceStream(data).
Zip(streams.NewSliceStream(data), 0, func(left, right int) int {
streams.NewStream(data).
Zip(streams.NewStream(data), 0, func(left, right int) int {
return left + right
}).
ToSlice(&out)
Expand All @@ -400,9 +400,9 @@ func TestMismatchedZip(t *testing.T) {

var out []int

streams.NewSliceStream(data).
Concat(streams.NewSliceStream(data)).
Zip(streams.NewSliceStream(data), 0, func(left, right int) int {
streams.NewStream(data).
Concat(streams.NewStream(data)).
Zip(streams.NewStream(data), 0, func(left, right int) int {
return left + right
}).
ToSlice(&out)
Expand Down

0 comments on commit d030f48

Please sign in to comment.