Skip to content

Commit

Permalink
Merge pull request #3 from meshtrade/channel-refactor
Browse files Browse the repository at this point in the history
Refactor: pipeline to use channels
  • Loading branch information
KyleSmith19091 authored Oct 26, 2024
2 parents 34c3979 + 11ec6f5 commit 2587dc9
Show file tree
Hide file tree
Showing 18 changed files with 483 additions and 278 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ Pipeline(

### Source

A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence. Joining is useful when you can collect the data from the sources in parallel and you want to merge the data into a single entity.
A source component should be responsible for ingesting data from some resilient data store or streams. Sources can either be chained or joined. Chaining sources is useful when you need to combine data from multiple data stores or streams, in a sequence.

#### Source Operators

- ```SequenceSource(Source[T], ChainedSource[T,V]) -> Source[V]```: Used to chain together two depdent sources, the second source is a special source implementation that takes the result returned from the first source and passes it to the second
- ```Join(Source[T], Source[V], func(T, V) -> K) -> Source[K]```: Used to execute two sources concurrently and join the results using a joining function into a new data source type.

### Stage

Expand Down
11 changes: 9 additions & 2 deletions etl/mongo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mongo
import (
"context"

"github.com/meshtrade/mesh-etl/etl/pipeline"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -21,7 +22,7 @@ func NewMongoCollector[T any](collection mongo.Collection, query bson.D, opts ..
}
}

func (m *MongoCollector[T]) Collect(ctx context.Context) ([]T, error) {
func (m *MongoCollector[T]) Collect(ctx context.Context, pipelineState *pipeline.PipelineState) (chan T, error) {
cursor, err := m.collection.Find(
ctx,
m.query,
Expand All @@ -36,5 +37,11 @@ func (m *MongoCollector[T]) Collect(ctx context.Context) ([]T, error) {
return nil, err
}

return records, nil
outputChannel := make(chan T, len(records))
for _, record := range records {
outputChannel <- record
}
close(outputChannel)

return outputChannel, nil
}
16 changes: 14 additions & 2 deletions etl/parquet/serialiser.go → etl/parquet/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
"github.com/meshtrade/mesh-etl/etl/pipeline"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -97,7 +98,13 @@ func buildArrowFieldsAndBuilders(pool memory.Allocator, elemType reflect.Type) (
return arrowFields, fieldBuilders, nil
}

func (s *ParquetSerialiser[T]) Marshal(ctx context.Context, inputStruct []T) ([]byte, error) {
func (s *ParquetSerialiser[T]) Serialise(ctx context.Context, p *pipeline.PipelineState, inChannel chan T) (chan []byte, error) {
// collect values from channel
inputStruct := []T{}
for inValue := range inChannel {
inputStruct = append(inputStruct, inValue)
}

// get the reflection value of the input slice
timeType := reflect.TypeOf(time.Time{})

Expand Down Expand Up @@ -166,7 +173,12 @@ func (s *ParquetSerialiser[T]) Marshal(ctx context.Context, inputStruct []T) ([]
// NOTE: NEVER call close in defer function!
pw.Close()

return dataBuffer.Bytes(), nil
// load value into output channel
outputChannel := make(chan []byte, 1)
outputChannel <- dataBuffer.Bytes()
close(outputChannel)

return outputChannel, nil
}

func (s *ParquetSerialiser[T]) appendStructValues(builder *array.StructBuilder, structVal reflect.Value) error {
Expand Down
17 changes: 0 additions & 17 deletions etl/pipeline/filter.go

This file was deleted.

120 changes: 0 additions & 120 deletions etl/pipeline/join.go

This file was deleted.

13 changes: 0 additions & 13 deletions etl/pipeline/map.go

This file was deleted.

24 changes: 15 additions & 9 deletions etl/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,39 @@ package pipeline
import "context"

type Pipeline[T, V any] struct {
source Source[T]
stage Stage[T, V]
sink Sink[V]
source source[T]
stage stage[T, V]
sink sink[V]
state *PipelineState
}

func NewPipeline[T, V any](
source Source[T],
stage Stage[T, V],
sink Sink[V],
source source[T],
stage stage[T, V],
sink sink[V],
) *Pipeline[T, V] {
return &Pipeline[T, V]{
source: source,
stage: stage,
sink: sink,
state: NewPipelineState(),
}
}

func (p *Pipeline[T, V]) Execute(ctx context.Context) error {
values, err := p.source(ctx)
sourceChannel, err := p.source(ctx, p.state)
if err != nil {
return err
}

stageValues, err := p.stage(ctx, values)
stageChannel, err := p.stage(ctx, p.state, sourceChannel)
if err != nil {
return err
}

return p.sink(ctx, stageValues)
if err := p.sink(ctx, p.state, stageChannel); err != nil {
return err
}

return p.state.RunAfterEffects(ctx)
}
41 changes: 41 additions & 0 deletions etl/pipeline/pipelineState.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package pipeline

import (
"context"

"golang.org/x/sync/errgroup"
)

type AfterEffect func(ctx context.Context) error

type PipelineState struct {
afterEffects []AfterEffect
}

func NewPipelineState() *PipelineState {
return &PipelineState{
afterEffects: []AfterEffect{},
}
}

func (p *PipelineState) RegisterAfterEffect(afterEffect AfterEffect) {
p.afterEffects = append(p.afterEffects, afterEffect)
}

func (p *PipelineState) RunAfterEffects(ctx context.Context) error {
errGroup := new(errgroup.Group)

for _, afterEffect := range p.afterEffects {
errGroup.Go(func() error {
return afterEffect(ctx)
})
}

if err := errGroup.Wait(); err != nil {
return err
}

p.afterEffects = []AfterEffect{}

return nil
}
42 changes: 0 additions & 42 deletions etl/pipeline/sequence.go

This file was deleted.

Loading

0 comments on commit 2587dc9

Please sign in to comment.