Skip to content

Commit

Permalink
refactor pipeline to use channels
Browse files Browse the repository at this point in the history
  • Loading branch information
KyleSmith19091 committed Oct 21, 2024
1 parent 34c3979 commit 4450132
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 238 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ Pipeline(

### Source

A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence. Joining is useful when you can collect the data from the sources in parallel and you want to merge the data into a single entity.
A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence.

#### Source Operators

- ```SequenceSource(Source[T], ChainedSource[T,V]) -> Source[V]```: Used to chain together two depdent sources, the second source is a special source implementation that takes the result returned from the first source and passes it to the second
- ```Join(Source[T], Source[V], func(T, V) -> K) -> Source[K]```: Used to execute two sources concurrently and join the results using a joining function into a new data source type.

### Stage

Expand Down
17 changes: 0 additions & 17 deletions etl/pipeline/filter.go

This file was deleted.

120 changes: 0 additions & 120 deletions etl/pipeline/join.go

This file was deleted.

13 changes: 0 additions & 13 deletions etl/pipeline/map.go

This file was deleted.

18 changes: 9 additions & 9 deletions etl/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package pipeline
import "context"

type Pipeline[T, V any] struct {
source Source[T]
stage Stage[T, V]
sink Sink[V]
source source[T]
stage stage[T, V]
sink sink[V]
}

func NewPipeline[T, V any](
source Source[T],
stage Stage[T, V],
sink Sink[V],
source source[T],
stage stage[T, V],
sink sink[V],
) *Pipeline[T, V] {
return &Pipeline[T, V]{
source: source,
Expand All @@ -21,15 +21,15 @@ func NewPipeline[T, V any](
}

func (p *Pipeline[T, V]) Execute(ctx context.Context) error {
values, err := p.source(ctx)
sourceChannel, err := p.source(ctx)
if err != nil {
return err
}

stageValues, err := p.stage(ctx, values)
stageChannel, err := p.stage(ctx, sourceChannel)
if err != nil {
return err
}

return p.sink(ctx, stageValues)
return p.sink(ctx, stageChannel)
}
42 changes: 0 additions & 42 deletions etl/pipeline/sequence.go

This file was deleted.

62 changes: 61 additions & 1 deletion etl/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,64 @@ package pipeline

import "context"

type Sink[T any] func(context.Context, []T) error
type sink[T any] func(context.Context, chan T) error

func Emit[T any](emitFunc func(context.Context, T) error) sink[T] {
return func(ctx context.Context, inChannel chan T) error {
// collect values from channel
for inValue := range inChannel {
if err := emitFunc(ctx, inValue); err != nil {
return err
}
}

return nil
}
}

func Spread[T any](sinks ...sink[T]) sink[T] {
// create list of channels for sinks
sinkChannels := make([]chan T, len(sinks))

return func(ctx context.Context, inChannel chan T) error {
// collect input values from channel
inValues := make([]T, len(inChannel))
idx := 0
for inValue := range inChannel {
inValues[idx] = inValue
idx++
}

// construct sink channel for each sink and load input values
for range sinks {
sinkChannel := make(chan T, len(inChannel))
for _, inValue := range inValues {
sinkChannel <- inValue
}
close(sinkChannel)
sinkChannels = append(sinkChannels, sinkChannel)
}

// execute sinks
for idx, sink := range sinks {
if err := sink(ctx, sinkChannels[idx]); err != nil {
return err
}
}

return nil
}
}

func SequenceSink[T any](
sinks ...sink[T],
) sink[T] {
return func(ctx context.Context, c chan T) error {
for _, sink := range sinks {
if err := sink(ctx, c); err != nil {
return err
}
}
return nil
}
}
Loading

0 comments on commit 4450132

Please sign in to comment.