-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathwriter.go
109 lines (89 loc) · 2.93 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
import (
"context"
"fmt"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
// NewWriterConfig creates a new writer config
func NewWriterConfig(operatorID, operatorType string) WriterConfig {
return WriterConfig{
BasicConfig: NewBasicConfig(operatorID, operatorType),
}
}
// WriterConfig is the configuration of a writer operator.
type WriterConfig struct {
BasicConfig `mapstructure:",squash"`
OutputIDs []string `mapstructure:"output"`
}
// Build will build a writer operator from the config.
func (c WriterConfig) Build(logger *zap.SugaredLogger) (WriterOperator, error) {
basicOperator, err := c.BasicConfig.Build(logger)
if err != nil {
return WriterOperator{}, err
}
return WriterOperator{
OutputIDs: c.OutputIDs,
BasicOperator: basicOperator,
}, nil
}
// WriterOperator is an operator that can write to other operators.
type WriterOperator struct {
BasicOperator
OutputIDs []string
OutputOperators []operator.Operator
}
// Write will write an entry to the outputs of the operator.
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) {
for i, operator := range w.OutputOperators {
if i == len(w.OutputOperators)-1 {
_ = operator.Process(ctx, e)
return
}
_ = operator.Process(ctx, e.Copy())
}
}
// CanOutput always returns true for a writer operator.
func (w *WriterOperator) CanOutput() bool {
return true
}
// Outputs returns the outputs of the writer operator.
func (w *WriterOperator) Outputs() []operator.Operator {
return w.OutputOperators
}
// GetOutputIDs returns the output IDs of the writer operator.
func (w *WriterOperator) GetOutputIDs() []string {
return w.OutputIDs
}
// SetOutputs will set the outputs of the operator.
func (w *WriterOperator) SetOutputs(operators []operator.Operator) error {
outputOperators := make([]operator.Operator, len(w.OutputIDs))
for i, operatorID := range w.OutputIDs {
operator, ok := w.findOperator(operators, operatorID)
if !ok {
return fmt.Errorf("operator '%s' does not exist", operatorID)
}
if !operator.CanProcess() {
return fmt.Errorf("operator '%s' can not process entries", operatorID)
}
outputOperators[i] = operator
}
w.OutputOperators = outputOperators
return nil
}
// SetOutputIDs will set the outputs of the operator.
func (w *WriterOperator) SetOutputIDs(opIDs []string) {
w.OutputIDs = opIDs
}
// FindOperator will find an operator matching the supplied id.
func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID string) (operator.Operator, bool) {
for _, operator := range operators {
if operator.ID() == operatorID {
return operator, true
}
}
return nil, false
}