diff --git a/README.md b/README.md index 45b5a51..ca60540 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,11 @@ Pipeline( ### Source -A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence. Joining is useful when you can collect the data from the sources in parallel and you want to merge the data into a single entity. +A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence. #### Source Operators - ```SequenceSource(Source[T], ChainedSource[T,V]) -> Source[V]```: Used to chain together two depdent sources, the second source is a special source implementation that takes the result returned from the first source and passes it to the second -- ```Join(Source[T], Source[V], func(T, V) -> K) -> Source[K]```: Used to execute two sources concurrently and join the results using a joining function into a new data source type. ### Stage diff --git a/etl/pipeline/filter.go b/etl/pipeline/filter.go deleted file mode 100644 index 9160b3c..0000000 --- a/etl/pipeline/filter.go +++ /dev/null @@ -1,17 +0,0 @@ -package pipeline - -import ( - "context" -) - -func Filter[T any](filterFunc func(context.Context, T) bool) Stage[T, T] { - return func(ctx context.Context, t []T) ([]T, error) { - filteredArr := []T{} - for _, value := range t { - if filterFunc(ctx, value) { - filteredArr = append(filteredArr, value) - } - } - return filteredArr, nil - } -} diff --git a/etl/pipeline/join.go b/etl/pipeline/join.go deleted file mode 100644 index 3654dfd..0000000 --- a/etl/pipeline/join.go +++ /dev/null @@ -1,120 +0,0 @@ -package pipeline - -import "context" - -type JoinType int - -const ( - JoinLeft JoinType = iota - JoinRight - JoinLeftSingle - JoinRightSingle -) - -type JoinFunc[T, V, K any] func(ctx context.Context, left T, right V) K - -func JoinSource[T, V, K any]( - left Source[T], - right Source[V], - joinFunc JoinFunc[T, V, K], -) Source[K] { - return func(ctx context.Context) ([]K, error) { - leftValues, err := left(ctx) - if err != nil { - return nil, err - } - rightValues, err := right(ctx) - if err != nil { - return nil, err - } - - joinType := validate( - leftValues, - rightValues, - ) - - switch joinType { - case JoinLeft: - return joinLeft( - ctx, - leftValues, - rightValues, - joinFunc, - ), nil - case JoinLeftSingle: - return joinLeftSingle( - ctx, - leftValues, - rightValues, - joinFunc, - ), nil - case JoinRight: - return joinRight( - ctx, - leftValues, - rightValues, - joinFunc, - ), nil - case JoinRightSingle: - return joinRightSingle( - ctx, - leftValues, - rightValues, - joinFunc, - ), nil - } - - return nil, nil - } -} - -func validate[T, V any](left []T, right []V) JoinType { - leftLen := len(left) - rightLen := len(right) - - if leftLen == 1 { - return JoinLeftSingle - } - - if rightLen == 1 { - return JoinRightSingle - } - - if leftLen > rightLen { - return JoinLeft - } - - return JoinRight -} - -func joinLeft[T, V, K any](ctx context.Context, left []T, right []V, joinFunc JoinFunc[T, V, K]) []K { - ks := []K{} - for rightIdx, rightValue := range right { - ks = append(ks, joinFunc(ctx, left[rightIdx], rightValue)) - } - return ks -} - -func joinLeftSingle[T, V, K any](ctx context.Context, left []T, right []V, joinFunc JoinFunc[T, V, K]) []K { - ks := []K{} - for _, rightValue := range right { - ks = append(ks, joinFunc(ctx, left[0], rightValue)) - } - return ks -} - -func joinRight[T, V, K any](ctx context.Context, left []T, right []V, joinFunc JoinFunc[T, V, K]) []K { - ks := []K{} - for leftIdx, leftValue := range left { - ks = append(ks, joinFunc(ctx, leftValue, right[leftIdx])) - } - return ks -} - -func joinRightSingle[T, V, K any](ctx context.Context, left []T, right []V, joinFunc JoinFunc[T, V, K]) []K { - ks := []K{} - for _, leftValue := range left { - ks = append(ks, joinFunc(ctx, leftValue, right[0])) - } - return ks -} diff --git a/etl/pipeline/map.go b/etl/pipeline/map.go deleted file mode 100644 index 8caf692..0000000 --- a/etl/pipeline/map.go +++ /dev/null @@ -1,13 +0,0 @@ -package pipeline - -import "context" - -func Map[T, V any](mapFunc func(context.Context, T) V) Stage[T, V] { - return func(ctx context.Context, t []T) ([]V, error) { - mappedArr := []V{} - for _, value := range t { - mappedArr = append(mappedArr, mapFunc(ctx, value)) - } - return mappedArr, nil - } -} diff --git a/etl/pipeline/pipeline.go b/etl/pipeline/pipeline.go index 6c9e857..bdcc9d6 100644 --- a/etl/pipeline/pipeline.go +++ b/etl/pipeline/pipeline.go @@ -3,15 +3,15 @@ package pipeline import "context" type Pipeline[T, V any] struct { - source Source[T] - stage Stage[T, V] - sink Sink[V] + source source[T] + stage stage[T, V] + sink sink[V] } func NewPipeline[T, V any]( - source Source[T], - stage Stage[T, V], - sink Sink[V], + source source[T], + stage stage[T, V], + sink sink[V], ) *Pipeline[T, V] { return &Pipeline[T, V]{ source: source, @@ -21,15 +21,15 @@ func NewPipeline[T, V any]( } func (p *Pipeline[T, V]) Execute(ctx context.Context) error { - values, err := p.source(ctx) + sourceChannel, err := p.source(ctx) if err != nil { return err } - stageValues, err := p.stage(ctx, values) + stageChannel, err := p.stage(ctx, sourceChannel) if err != nil { return err } - return p.sink(ctx, stageValues) + return p.sink(ctx, stageChannel) } diff --git a/etl/pipeline/sequence.go b/etl/pipeline/sequence.go deleted file mode 100644 index 4fa8310..0000000 --- a/etl/pipeline/sequence.go +++ /dev/null @@ -1,42 +0,0 @@ -package pipeline - -import "context" - -func SequenceSource[T, V any]( - s1 Source[T], - s2 ChainedSource[T, V], -) Source[V] { - return func(ctx context.Context) ([]V, error) { - ts, err := s1(ctx) - if err != nil { - return nil, err - } - return s2(ctx, ts)(ctx) - } -} - -func SequenceStage[T, V, K any]( - s1 Stage[T, V], - s2 Stage[V, K], -) Stage[T, K] { - return func(ctx context.Context, t []T) ([]K, error) { - ks, err := s1(ctx, t) - if err != nil { - return nil, err - } - return s2(ctx, ks) - } -} - -func SequenceSink[T any]( - sinks ...Sink[T], -) Sink[T] { - return func(ctx context.Context, t []T) error { - for _, sink := range sinks { - if err := sink(ctx, t); err != nil { - return err - } - } - return nil - } -} diff --git a/etl/pipeline/sink.go b/etl/pipeline/sink.go index a67e482..0496f20 100644 --- a/etl/pipeline/sink.go +++ b/etl/pipeline/sink.go @@ -2,4 +2,64 @@ package pipeline import "context" -type Sink[T any] func(context.Context, []T) error +type sink[T any] func(context.Context, chan T) error + +func Emit[T any](emitFunc func(context.Context, T) error) sink[T] { + return func(ctx context.Context, inChannel chan T) error { + // collect values from channel + for inValue := range inChannel { + if err := emitFunc(ctx, inValue); err != nil { + return err + } + } + + return nil + } +} + +func Spread[T any](sinks ...sink[T]) sink[T] { + // create list of channels for sinks + sinkChannels := make([]chan T, len(sinks)) + + return func(ctx context.Context, inChannel chan T) error { + // collect input values from channel + inValues := make([]T, len(inChannel)) + idx := 0 + for inValue := range inChannel { + inValues[idx] = inValue + idx++ + } + + // construct sink channel for each sink and load input values + for range sinks { + sinkChannel := make(chan T, len(inChannel)) + for _, inValue := range inValues { + sinkChannel <- inValue + } + close(sinkChannel) + sinkChannels = append(sinkChannels, sinkChannel) + } + + // execute sinks + for idx, sink := range sinks { + if err := sink(ctx, sinkChannels[idx]); err != nil { + return err + } + } + + return nil + } +} + +func SequenceSink[T any]( + sinks ...sink[T], +) sink[T] { + return func(ctx context.Context, c chan T) error { + for _, sink := range sinks { + if err := sink(ctx, c); err != nil { + return err + } + } + return nil + } +} diff --git a/etl/pipeline/source.go b/etl/pipeline/source.go index 38ce808..8fb9d9c 100644 --- a/etl/pipeline/source.go +++ b/etl/pipeline/source.go @@ -4,6 +4,120 @@ import ( "context" ) -type Source[T any] func(ctx context.Context) ([]T, error) +type source[T any] func(ctx context.Context) (chan T, error) +type chainedSource[T, V any] func(context.Context, chan T) (chan V, error) -type ChainedSource[T, V any] func(context.Context, []T) func(context.Context) ([]V, error) +func SourceBatch[T any](sourceFunc func(context.Context) ([]T, error)) source[T] { + // return function to be executed during pipeline execution + return func(ctx context.Context) (chan T, error) { + // execute source + batch, err := sourceFunc(ctx) + if err != nil { + return nil, err + } + + // create batch channel + batchChan := make(chan T, len(batch)) + + // load batch into channel into channel if nothing went wrong + for _, element := range batch { + batchChan <- element + } + close(batchChan) + + return batchChan, nil + } +} + +func ChainedSourceBatch[T, V any](sourceFunc func(context.Context, []T) ([]V, error)) chainedSource[T, V] { + return func(ctx context.Context, inChannel chan T) (chan V, error) { + inValues := make([]T, len(inChannel)) + idx := 0 + for inValue := range inChannel { + inValues[idx] = inValue + idx++ + } + + outValues, err := sourceFunc(ctx, inValues) + if err != nil { + return nil, err + } + + outChannel := make(chan V, len(outValues)) + for _, outValue := range outValues { + outChannel <- outValue + } + close(outChannel) + + return outChannel, nil + } +} + +func SourceScalar[T any](source func(context.Context) (T, error)) source[T] { + // return function to be executed during pipeline execution + return func(ctx context.Context) (chan T, error) { + // create synchronous channel + scalarChan := make(chan T, 1) + + // execute source + scalar, err := source(ctx) + if err != nil { + return nil, err + } + + // load scalar value into channel if nothing went wrong + scalarChan <- scalar + close(scalarChan) + + return scalarChan, err + } +} + +func ChainedSourceScalar[T, V any](sourceFunc func(context.Context, T) ([]V, error)) chainedSource[T, V] { + return func(ctx context.Context, inChannel chan T) (chan V, error) { + // reading all values to prevent leak + inValues := make([]T, len(inChannel)) + idx := 0 + for inValue := range inChannel { + inValues[idx] = inValue + idx++ + } + + // call source with first inValue + outValues, err := sourceFunc(ctx, inValues[0]) + if err != nil { + return nil, err + } + + // collect output + outChannel := make(chan V, len(outValues)) + for _, outValue := range outValues { + outChannel <- outValue + } + close(outChannel) + + return outChannel, nil + } +} + +func SequenceSource[T, V any](source1 source[T], source2 chainedSource[T, V]) source[V] { + return func(ctx context.Context) (chan V, error) { + // execute source1 to obtain handle to channel + source1Chan, err := source1(ctx) + if err != nil { + return nil, err + } + + // execute source2 given source1 + source2Chan, err := source2(ctx, source1Chan) + + // load source2 data into channel + chainChannel := make(chan V, len(source2Chan)) + for source2Value := range source2Chan { + chainChannel <- source2Value + } + close(chainChannel) + + return chainChannel, err + } +} diff --git a/etl/pipeline/spread.go b/etl/pipeline/spread.go deleted file mode 100644 index 511d9b0..0000000 --- a/etl/pipeline/spread.go +++ /dev/null @@ -1,14 +0,0 @@ -package pipeline - -import "context" - -func Spread[T any](sinks ...Sink[T]) func(ctx context.Context, t []T) error { - return func(ctx context.Context, t []T) error { - for _, sink := range sinks { - if err := sink(ctx, t); err != nil { - return err - } - } - return nil - } -} diff --git a/etl/pipeline/stage.go b/etl/pipeline/stage.go index 060a9a0..b4e09f3 100644 --- a/etl/pipeline/stage.go +++ b/etl/pipeline/stage.go @@ -4,4 +4,57 @@ import ( "context" ) -type Stage[T any, V any] func(context.Context, []T) ([]V, error) +type stage[T any, V any] func(context.Context, chan T) (chan V, error) + +func Map[T, V any](mapFunc func(context.Context, T) V) stage[T, V] { + return func(ctx context.Context, inChannel chan T) (chan V, error) { + // create channel with buffer for each element in inChannel + mapChan := make(chan V, len(inChannel)) + + // map values from inChannel + for inValue := range inChannel { + mapChan <- mapFunc(ctx, inValue) + } + close(mapChan) + + return mapChan, nil + } +} + +func Filter[T any](filterFunc func(context.Context, T) bool) stage[T, T] { + return func(ctx context.Context, inChannel chan T) (chan T, error) { + // optimistically allocate buffer for all elements in input channel + filterChan := make(chan T, len(inChannel)) + + // filter values from inChannel + for inValue := range inChannel { + if filterFunc(ctx, inValue) { + filterChan <- inValue + } + } + + return filterChan, nil + } +} + +func SequenceStage[T, V, K any]( + stage1 stage[T, V], + stage2 stage[V, K], +) stage[T, K] { + return func(ctx context.Context, inChannel chan T) (chan K, error) { + + // execute stage 1 + stage1Channel, err := stage1(ctx, inChannel) + if err != nil { + return nil, err + } + + // execute stage 2 + stage2Channel, err := stage2(ctx, stage1Channel) + if err != nil { + return nil, err + } + + return stage2Channel, err + } +} diff --git a/examples/collector.go b/examples/collector.go index a0d1bd8..0457967 100644 --- a/examples/collector.go +++ b/examples/collector.go @@ -1,4 +1,4 @@ -package examples +package main import "context" diff --git a/examples/collectorImpl.go b/examples/collectorImpl.go index 068ffd3..2057715 100644 --- a/examples/collectorImpl.go +++ b/examples/collectorImpl.go @@ -1,4 +1,4 @@ -package examples +package main import ( "context" diff --git a/examples/emitter.go b/examples/emitter.go index 7678151..6d34d4a 100644 --- a/examples/emitter.go +++ b/examples/emitter.go @@ -1,7 +1,7 @@ -package examples +package main import "context" -type DataEmitter interface { - Emit(ctx context.Context, data []byte) error +type DataEmitter[T any] interface { + Emit(ctx context.Context, value T) error } diff --git a/examples/main.go b/examples/main.go index 9095a10..f97577a 100644 --- a/examples/main.go +++ b/examples/main.go @@ -1,10 +1,40 @@ -package examples +package main + +import ( + "context" + "log" + + "github.com/meshtrade/mesh-etl/etl/pipeline" +) type Model struct { - Value string - Ascii []rune + Value int } -func Main() { +func main() { + pipeline := pipeline.NewPipeline( + pipeline.SequenceSource( + pipeline.SourceScalar(func(ctx context.Context) (int, error) { + return 0, nil + }), + pipeline.ChainedSourceScalar(func(ctx context.Context, in int) ([]int, error) { + return []int{1, 2, 3, 4}, nil + }), + ), + pipeline.SequenceStage( + pipeline.Map(func(ctx context.Context, inValue int) int { + return inValue * inValue + }), + pipeline.Map(func(ctx context.Context, inValue int) Model { + return Model{ + Value: inValue, + } + }), + ), + pipeline.Emit(NewSTDOutEmitter[Model]().Emit), + ) + if err := pipeline.Execute(context.Background()); err != nil { + log.Fatal(err) + } } diff --git a/examples/stdoutEmitter.go b/examples/stdoutEmitter.go index 8e69f2a..249dc5f 100644 --- a/examples/stdoutEmitter.go +++ b/examples/stdoutEmitter.go @@ -1,20 +1,18 @@ -package examples +package main import ( "context" "fmt" ) -var _ DataEmitter = &StdOutEmitter{} - -type StdOutEmitter struct { +type StdOutEmitter[T any] struct { } -func NewSTDOutEmitter() *StdOutEmitter { - return &StdOutEmitter{} +func NewSTDOutEmitter[T any]() *StdOutEmitter[T] { + return &StdOutEmitter[T]{} } -func (e *StdOutEmitter) Emit(ctx context.Context, data []byte) error { - fmt.Printf("Data: %v\n", data) +func (e *StdOutEmitter[T]) Emit(ctx context.Context, value T) error { + fmt.Printf("Data: %v\n", value) return nil }