Skip to content

Commit

Permalink
Merge pull request #5 from meshtrade/use-defers
Browse files Browse the repository at this point in the history
use defers for closing channels
  • Loading branch information
KyleSmith19091 authored Nov 11, 2024
2 parents 5236c18 + bce7cff commit 2ae43ea
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
5 changes: 2 additions & 3 deletions etl/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ func Spread[T any](sinks ...sink[T]) sink[T] {
for range sinks {
// allocate sink channel to hold values
sinkChannel := make(chan T, len(inChannel))
// close channel to indicate no more data will be sent
defer close(sinkChannel)

// load input values into new sink channel
for _, inValue := range inValues {
sinkChannel <- inValue
}

// close channel to indicate no more data will be sent
close(sinkChannel)

// add channel to list of channels
sinkChannels = append(sinkChannels, sinkChannel)
}
Expand Down
8 changes: 4 additions & 4 deletions etl/pipeline/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func SourceBatch[T any](sourceFunc func(context.Context, *PipelineState) ([]T, e

// create batch channel
batchChan := make(chan T, len(batch))
defer close(batchChan)

// load batch into channel into channel if nothing went wrong
for _, element := range batch {
batchChan <- element
}
close(batchChan)

return batchChan, nil
}
Expand All @@ -44,10 +44,10 @@ func ChainedSourceBatch[T, V any](sourceFunc func(context.Context, *PipelineStat
}

outChannel := make(chan V, len(outValues))
defer close(outChannel)
for _, outValue := range outValues {
outChannel <- outValue
}
close(outChannel)

return outChannel, nil
}
Expand Down Expand Up @@ -91,10 +91,10 @@ func ChainedSourceScalar[T, V any](sourceFunc func(context.Context, *PipelineSta

// collect output
outChannel := make(chan V, len(outValues))
defer close(outChannel)
for _, outValue := range outValues {
outChannel <- outValue
}
close(outChannel)

return outChannel, nil
}
Expand All @@ -113,10 +113,10 @@ func SequenceSource[T, V any](source1 source[T], source2 chainedSource[T, V]) so

// load source2 data into channel
chainChannel := make(chan V, len(source2Chan))
defer close(chainChannel)
for source2Value := range source2Chan {
chainChannel <- source2Value
}
close(chainChannel)

return chainChannel, err
}
Expand Down
7 changes: 4 additions & 3 deletions etl/pipeline/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ func Map[T, V any](mapFunc func(context.Context, *PipelineState, T) V) stage[T,
return func(ctx context.Context, p *PipelineState, inChannel chan T) (chan V, error) {
// create channel with buffer for each element in inChannel
mapChan := make(chan V, len(inChannel))
defer close(mapChan)

// map values from inChannel
for inValue := range inChannel {
mapChan <- mapFunc(ctx, p, inValue)
}
close(mapChan)

return mapChan, nil
}
Expand All @@ -26,6 +26,7 @@ func Filter[T any](filterFunc func(context.Context, *PipelineState, T) bool) sta
return func(ctx context.Context, p *PipelineState, inChannel chan T) (chan T, error) {
// optimistically allocate buffer for all elements in input channel
filterChan := make(chan T, len(inChannel))
defer close(filterChan)

// filter values from inChannel
for inValue := range inChannel {
Expand All @@ -48,17 +49,17 @@ func Shuffle[T any]() stage[T, T] {
idx++
}

// shuffle the data (NOTE: Assuming Go 1.20 which automatically sets seed)
// shuffle the data (NOTE: Assuming Go 1.20 runtime which automatically sets seed)
rand.Shuffle(len(inputValues), func(i, j int) {
inputValues[i], inputValues[j] = inputValues[j], inputValues[i]
})

// load data into return channel
outChannel := make(chan T, len(inputValues))
defer close(outChannel)
for _, inputValue := range inputValues {
outChannel <- inputValue
}
close(outChannel)

return outChannel, nil
}
Expand Down

0 comments on commit 2ae43ea

Please sign in to comment.