Skip to content

Commit

Permalink
Add Reduce functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
DenWav committed Feb 23, 2019
1 parent b9346be commit d001a3b
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 38 deletions.
6 changes: 3 additions & 3 deletions slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"reflect"
)

func checkSlice(slice interface{}) reflect.Type {
func checkSlice(slice AnySlice) reflect.Type {
t := reflect.TypeOf(slice)
if t.Kind() != reflect.Slice {
panic(errors.New("provided type is not a slice"))
}
return t
}

func sliceIndex(slice interface{}, index int) interface{} {
func sliceIndex(slice AnySlice, index int) AnyType {
t := checkSlice(slice)

if t.Elem().Kind() == reflect.Interface {
Expand All @@ -25,7 +25,7 @@ func sliceIndex(slice interface{}, index int) interface{} {
return val.Index(index).Interface()
}

func sliceLength(slice interface{}) int {
func sliceLen(slice AnySlice) int {
checkSlice(slice)

val := reflect.ValueOf(slice)
Expand Down
159 changes: 124 additions & 35 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,32 @@ type MapToIntFunction interface{}
// panic will occur.
type MapToFloatFunction interface{}

// AccumulatorFunction is an empty stand-in type for a generic function with a type signature as
//
// <T, U> func(left T, right U) U
//
// Where there is some type T as the first parameter, and some type U as the second parameter as input to the function,
// and the same type U as output. If this type signature is not maintained where this function is used, a panic will
// occur.
type AccumulatorFunction interface{}

// AnyType is an empty stand-in type for any type. Unlike other places in this library where interface{} is used to
// mimic generic functions, any place this is used signifies the element may be a value of any particular type, as long
// as the type is compatible with where it is used as defined by the documentation.
type AnyType interface{}

// AnySlice is an empty stand-in type for a slice which contains any element type. When this type is used in this
// library, it represents a type of []T, where T is any type. Note that this is not the same thing as []interface{}, as
// []interface{} specifies a particular memory layout which is different from other types. The element values of the
// slice must be compatible with where it is used as defined by the documentation.
type AnySlice interface{}

// AnyChannel is an empty stand-in type for a channel which deals with any element type. When this type is used in this
// library, it represents a type of chan T, where T is any type. Note that this is not the same thing as
// chan interface{}, as chan interface{} specifies a particular memory layout which is different from other types. The
// element values of the channel must be compatible with where it is used as defined by the documentation.
type AnyChannel interface{}

// Stream represents a lazily evaluated chain of functions which operates on some source of values. Items are computed
// as they are asked for and as they go through the Stream pipeline, so if an items doesn't need to be processed by
// later parts of the Stream, it is skipped.
Expand Down Expand Up @@ -173,7 +199,7 @@ type MapToFloatFunction interface{}
// Stream returned from 'Map()' on line 3 has an implicit type of 'int', so the final Stream assigned to 's' also has
// an implicit type of 'int'.
type Stream struct {
next func() (interface{}, bool)
next func() (AnyType, bool)
cancel *[]chan<- bool
}

Expand All @@ -187,8 +213,8 @@ type Stream struct {
// to be stopped when processing of the Stream completes. A single 'true' value will be sent to each channel given. The
// send operation will not wait or block, so either define each channel as a buffered channel, or make sure you're
// always listening to it.
func NewChanStream(channel interface{}, cancel ...chan<- bool) *Stream {
return &Stream{func() (interface{}, bool) {
func NewChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream {
return &Stream{func() (AnyType, bool) {
item, ok := chanRecv(channel)
if ok {
return item, true
Expand All @@ -200,10 +226,10 @@ func NewChanStream(channel interface{}, cancel ...chan<- bool) *Stream {

// NewSliceStream creates a new Stream object that uses the provided slice as the source. Teh first argument to this
// function must be a []R where R is some type. The implicit type of the returned Stream will be R.
func NewSliceStream(slice interface{}) *Stream {
func NewSliceStream(slice AnySlice) *Stream {
index := 0
return &Stream{func() (interface{}, bool) {
if index < sliceLength(slice) {
return &Stream{func() (AnyType, bool) {
if index < sliceLen(slice) {
item := sliceIndex(slice, index)
index++
return item, true
Expand Down Expand Up @@ -243,7 +269,7 @@ func (s *Stream) finish() {
// type signature isn't correct, a panic will occur. The return type of this mapping function determines the new
// type for the elements in the returned Stream.
func (s *Stream) Map(mapperFunc MapFunction) *Stream {
return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
n, more := s.next()
if !more {
return nil, false
Expand All @@ -263,7 +289,7 @@ func (s *Stream) Map(mapperFunc MapFunction) *Stream {
// And the input type must be compatible with every element in the Stream that makes it to this function. If this
// type signature isn't correct, a panic will occur.
func (s *Stream) Filter(filterFunc FilterFunction) *Stream {
return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
n, more := s.next()
for more {
if callFunc(filterFunc, reflect.ValueOf(n))[0].Interface().(bool) {
Expand Down Expand Up @@ -322,7 +348,7 @@ func (s *Stream) ChanFlatMap(mapperFunc ChanMapFunction) *Stream {
return next, false, true
}

return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
res, retry, more := nextItem()
if !more {
return nil, false
Expand Down Expand Up @@ -356,25 +382,47 @@ func (s *Stream) ChanFlatMap(mapperFunc ChanMapFunction) *Stream {
//
// then the returned Stream will process elements of type rune.
func (s *Stream) SliceFlatMap(mapperFunc SliceMapFunction) *Stream {
return s.ChanFlatMap(func(item interface{}) <-chan interface{} {
if item == nil {
return nil
}

slice := callFunc(mapperFunc, reflect.ValueOf(item))[0].Interface()
resChan := make(chan interface{})
var currentSlice AnySlice
currentIndex := 0
sliceLength := 0

go func() {
length := sliceLength(slice)
for i := 0; i < length; i++ {
el := sliceIndex(slice, i)
resChan <- el
nextItem := func() (res interface{}, retry, more bool) {
if currentSlice == nil {
item, more := s.next()
if !more {
return nil, false, false
}
close(resChan)
}()
currentSlice = callFunc(mapperFunc, reflect.ValueOf(item))[0].Interface()
sliceLength = sliceLen(currentSlice)
}
if currentSlice == nil {
return nil, false, true
}
if currentIndex < sliceLength {
res := sliceIndex(currentSlice, currentIndex)
currentIndex++
return res, false, true
} else {
currentIndex = 0
sliceLength = 0
currentSlice = nil
return nil, true, true
}
}

return resChan
})
return &Stream{func() (AnyType, bool) {
res, retry, more := nextItem()
if !more {
return nil, false
}
for retry {
res, retry, more = nextItem()
if !more {
return nil, false
}
}
return res, true
}, s.cancel}
}

// Take returns a Stream that only passes along the first n elements it sees. After either the source Stream stops
Expand All @@ -384,7 +432,7 @@ func (s *Stream) SliceFlatMap(mapperFunc SliceMapFunction) *Stream {
// either call this function or call First to prevent the final Stream from continually processing items.
func (s *Stream) Take(n int) *Stream {
count := 0
return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
if count >= n {
return nil, false
}
Expand All @@ -402,7 +450,7 @@ func (s *Stream) Take(n int) *Stream {
// sees n elements, this Stream will never pass along any items.
func (s *Stream) Skip(n int) *Stream {
count := 0
return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
if count >= n {
return s.next()
}
Expand All @@ -425,7 +473,7 @@ func (s *Stream) Skip(n int) *Stream {
func (s *Stream) Distinct() *Stream {
m := make(map[interface{}]bool)

return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
for {
item, more := s.next()
if !more {
Expand All @@ -442,7 +490,7 @@ func (s *Stream) Distinct() *Stream {

type sortable struct {
data []interface{}
compFunc interface{}
compFunc CompareFunction
}

func (s *sortable) Len() int {
Expand Down Expand Up @@ -490,7 +538,7 @@ func (s *Stream) Sort(lessFunc CompareFunction) *Stream {
sorted = sortableData.data
}

return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
if sorted == nil {
doSort()
}
Expand All @@ -516,7 +564,7 @@ func (s *Stream) Sort(lessFunc CompareFunction) *Stream {
// And the input type T must be compatible with ever element in the Stream that makes it to this function. If this type
// signature isn't correct, a panic will occur.
func (s *Stream) OnEach(voidFunc VoidFunction) *Stream {
return &Stream{func() (interface{}, bool) {
return &Stream{func() (AnyType, bool) {
n, more := s.next()
if !more {
return nil, false
Expand Down Expand Up @@ -560,7 +608,7 @@ func (s *Stream) First(filterFunc FilterFunction) (interface{}, bool) {
// ToSlice fills the given slice with the elements in the Stream. The slice type must be compatible with every item
// in the Stream. The input of this function must be a pointer to the slice, rather than the slice itself, so the
// slice may be resized as necessary.
func (s *Stream) ToSlice(t interface{}) {
func (s *Stream) ToSlice(t AnySlice) {
defer s.finish()

sliceValue := reflect.ValueOf(t).Elem()
Expand Down Expand Up @@ -675,7 +723,7 @@ func (s *Stream) ForEach(voidFunc VoidFunction) {
// occur.
//
// When no more items are to be sent to the channel, the given channel will be closed.
func (s *Stream) ToChan(channel interface{}) {
func (s *Stream) ToChan(channel AnyChannel) {
defer s.finish()

t := reflect.TypeOf(channel)
Expand Down Expand Up @@ -814,7 +862,10 @@ func (s *Stream) AvgFloat(mapperFunc MapToFloatFunction) float64 {
//
// The given function should return true if the left parameter should be considered smaller, or should come before, the
// right parameter.
func (s *Stream) Min(output interface{}, lessFunc CompareFunction) {
//
// Output must be of type *T, that is a pointer to T. The resulting minimum value from this Stream will be assigned to
// this pointer.
func (s *Stream) Min(output AnyType, lessFunc CompareFunction) {
defer s.finish()

var smallest *reflect.Value
Expand Down Expand Up @@ -859,8 +910,46 @@ func (s *Stream) Min(output interface{}, lessFunc CompareFunction) {
//
// The given function should return true if the left parameter should be considered smaller, or should come before, the
// right parameter.
func (s *Stream) Max(output interface{}, lessFunc CompareFunction) {
//
// Output must be of type *T, that is a pointer to T. The resulting maximum value from this Stream will be assigned to
// this pointer.
func (s *Stream) Max(output AnyType, lessFunc CompareFunction) {
s.Min(output, func(left, right interface{}) bool {
return callFunc(lessFunc, reflect.ValueOf(right), reflect.ValueOf(left))[0].Bool()
})
}

// Reduce combines the elements of this Stream into a single result using the provided accumulator function and a
// beginning identity value. This function signature, in a generic form, would look like:
//
// <U> func (s *Stream<T>) Reduce(output *U, identity U, accumulator func(T, U) U)
//
// Which is to say that if the implicit type of this Stream is T, the accumulator function takes in T as the first
// parameter, and take in some resultant type U as the second parameter, and also returns this same type U as the
// output. An identity value for this type U is provided as the second argument, which wil lbe the first value passed
// as the second argument of the accumulator function. After calling the accumulator function the first time, the result
// of the accumulator function will be used instead.
//
// The output must be a pointer to a result value, also of this output type U. The resulting reduced value from this
// Stream will be assigned to this pointer.
func (s *Stream) Reduce(output AnyType, identity AnyType, accumulator AccumulatorFunction) {
defer s.finish()

res := reflect.ValueOf(identity)

for i := 0; i < 50; i++ {
item, more := s.next()
if !more {
break
}

res = callFunc(accumulator, reflect.ValueOf(item), res)[0]
}

t := reflect.TypeOf(output)
if t.Kind() != reflect.Ptr {
panic(errors.New("provided output type is not a pointer"))
}

reflect.ValueOf(output).Elem().Set(res)
}
60 changes: 60 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/DemonWav/go-streams"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
"math"
"math/rand"
"sort"
"strings"
Expand Down Expand Up @@ -291,3 +292,62 @@ func TestMax(t *testing.T) {

assert.Equal(t, 6, val)
}

func TestReduce(t *testing.T) {
defer goleak.VerifyNoLeaks(t)

data := "The quick brown fox jumped over the lazy sheep dog."

var res map[rune]int

streams.NewSliceStream([]string{data}).
SliceFlatMap(func(line string) []rune {
return []rune(line)
}).
Filter(func(char rune) bool {
return unicode.IsLetter(char)
}).
Map(func(char rune) rune {
return unicode.ToLower(char)
}).
Reduce(&res, make(map[rune]int), func(char rune, m map[rune]int) map[rune]int {
m[char]++
return m
})

// The expected value hilariously shows how in-applicable this problem is to the above solution :D
exp := make(map[rune]int)
for _, c := range data {
if !unicode.IsLetter(c) {
continue
}
exp[unicode.ToLower(c)]++
}

assert.Equal(t, exp, res)
}

func TestReduceMin(t *testing.T) {
defer goleak.VerifyNoLeaks(t)

data := []int{1, 6, -5}

var valMin int
var valReduce int

streams.NewSliceStream(data).
Min(&valMin, func(left, right int) bool {
return left < right
})

streams.NewSliceStream(data).
Reduce(&valReduce, int(math.MaxInt32), func(item, out int) int {
if item < out {
return item
} else {
return out
}
})

assert.Equal(t, valMin, valReduce)
}

0 comments on commit d001a3b

Please sign in to comment.