Skip to content

Commit

Permalink
feat(pipeline): allow reader/writer customization. (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zenithar authored Feb 2, 2022
1 parent ef679f5 commit 2f7520f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
18 changes: 18 additions & 0 deletions pkg/bundle/pipeline/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package pipeline

import "io"

// Options defines default options.
type Options struct {
input io.Reader
output io.Writer
disableOutput bool
fpf FileProcessorFunc
ppf PackageProcessorFunc
Expand All @@ -29,6 +33,20 @@ type Options struct {
// Option represents option function
type Option func(*Options)

// InputReader defines the input reader used to retrieve the bundle content.
func InputReader(value io.Reader) Option {
return func(opts *Options) {
opts.input = value
}
}

// OutputWriter defines where the bundle will be written after process execution.
func OutputWriter(value io.Writer) Option {
return func(opts *Options) {
opts.output = value
}
}

// OutputDisabled assign the value to disableOutput option.
func OutputDisabled() Option {
return func(opts *Options) {
Expand Down
64 changes: 39 additions & 25 deletions pkg/bundle/pipeline/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,27 @@ import (
"fmt"
"os"

"github.com/gosimple/slug"

bundlev1 "github.com/elastic/harp/api/gen/go/harp/bundle/v1"
"github.com/elastic/harp/build/version"
"github.com/elastic/harp/pkg/bundle"
"github.com/elastic/harp/pkg/sdk/log"
)

// Run a processor.
func Run(ctx context.Context, name string, opts ...Option) error {
func Run(ctx context.Context, opts ...Option) error {
// Initialize a running context to attach all goroutines
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Initialize logger
log.Setup(ctx,
&log.Options{
Debug: true,
AppName: slug.Make(name),
AppID: version.ID(),
Version: version.Version,
Revision: version.Commit,
},
)

// Read bundle from Stdin
b, err := bundle.FromContainerReader(os.Stdin)
if err != nil {
return fmt.Errorf("unable to read bundle from stdin: %w", err)
}

const (
var (
defaultDisableOutput = false
defaultReader = os.Stdin
defaultWriter = os.Stdout
)

v := &bundleVisitor{
ctx: ctx,
opts: &Options{
input: defaultReader,
output: defaultWriter,
disableOutput: defaultDisableOutput,
},
position: &defaultContext{},
Expand All @@ -70,6 +53,12 @@ func Run(ctx context.Context, name string, opts ...Option) error {
opt(v.opts)
}

// Read bundle from Stdin
b, err := bundle.FromContainerReader(v.opts.input)
if err != nil {
return fmt.Errorf("unable to read bundle from stdin: %w", err)
}

// Apply remapping strategy
v.VisitForFile(b)

Expand All @@ -80,15 +69,40 @@ func Run(ctx context.Context, name string, opts ...Option) error {

if !v.opts.disableOutput {
// Write output bundle
if err := bundle.ToContainerWriter(os.Stdout, b); err != nil {
return fmt.Errorf("unable to dump remapped bundle content: %w", err)
if err := bundle.ToContainerWriter(v.opts.output, b); err != nil {
return fmt.Errorf("unable to dump processed bundle content: %w", err)
}
}

// No error
return nil
}

// Apply a pipeline process to the given bundle
func Apply(ctx context.Context, input *bundlev1.Bundle, opts ...Option) (*bundlev1.Bundle, error) {
v := &bundleVisitor{
ctx: ctx,
opts: &Options{},
position: &defaultContext{},
}

// Loop through each option
for _, opt := range opts {
opt(v.opts)
}

// Apply remapping strategy
v.VisitForFile(input)

// Check error
if err := v.Error(); err != nil {
return nil, fmt.Errorf("error during bundle processing: %w", err)
}

// No error
return input, nil
}

// -----------------------------------------------------------------------------

// Context is used to pass current node location to processor
Expand Down

0 comments on commit 2f7520f

Please sign in to comment.