Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try verbose simulate API to detect issues in pipelines #1786

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
95 changes: 88 additions & 7 deletions internal/elasticsearch/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,63 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"gopkg.in/yaml.v3"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/logger"
)

type simulatePipelineRequest struct {
Docs []pipelineDocument `json:"docs"`
}

type simulatePipelineResponse struct {
Docs []pipelineIngestedDocument `json:"docs"`
Docs []struct {
ProcessorResults []verboseProcessorResult `json:"processor_results"`
} `json:"docs"`
}

type verboseProcessorResult struct {
Processor string `json:"processor_type"`
Status string `json:"status"`
Doc pipelineDocument `json:"doc"`
Error verboseProcessorError `json:"error"`
Ignored struct {
Error verboseProcessorError `json:"error"`
} `json:"ignored_error"`
}

type verboseProcessorError struct {
Type string `json:"type"`
Reason string `json:"reason"`
RootCause json.RawMessage `json:"root_cause"`
}

func (e verboseProcessorError) Error() string {
return fmt.Sprintf("[%s] %s", e.Type, e.Reason)
}

type pipelineDocument struct {
Index string `json:"_index"`
Source json.RawMessage `json:"_source"`
Index string `json:"_index"`
Source json.RawMessage `json:"_source"`
Ingest verboseProcessorIngest `json:"_ingest"`
}

type pipelineIngestedDocument struct {
Doc pipelineDocument `json:"doc"`
type verboseProcessorIngest struct {
Pipeline string `json:"pipeline"`
Timestamp time.Time `json:"timestamp"`

OnFailurePipeline string `json:"on_failure_pipeline"`
OnFailureMessage string `json:"on_failure_message"`
OnFailureProcessorTag string `json:"on_failure_processor_tag"`
OnFailureProcessorType string `json:"on_failure_processor_type"`
}

// Pipeline represents a pipeline resource loaded from a file
Expand Down Expand Up @@ -90,6 +122,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
r, err := api.Ingest.Simulate(bytes.NewReader(requestBody),
api.Ingest.Simulate.WithContext(ctx),
api.Ingest.Simulate.WithPipelineID(pipelineName),
api.Ingest.Simulate.WithVerbose(true),
)
if err != nil {
return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err)
Expand All @@ -111,11 +144,59 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName
return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err)
}

handleErrors := func(ingest verboseProcessorIngest, errs []error) []error {
var filtered []error
for _, err := range errs {
var processorError verboseProcessorError
if errors.As(err, &processorError) && processorError.Reason == ingest.OnFailureMessage {
continue
}
filtered = append(filtered, err)
}
return filtered
}

processedEvents := make([]json.RawMessage, len(response.Docs))
var errs []error
for i, doc := range response.Docs {
processedEvents[i] = doc.Doc.Source
var source json.RawMessage
failed := false
for _, result := range doc.ProcessorResults {
if result.Doc.Ingest.OnFailureMessage != "" {
// This processor is in an on_failure handler, filter out the handled errors
// and assume that processing is going on.
errs = handleErrors(result.Doc.Ingest, errs)
failed = false
}

switch result.Status {
case "success":
// Keep last successful document.
source = result.Doc.Source
case "dropped":
source = nil
case "skipped":
continue
case "error_ignored":
logger.Debugf("error ignored for processor %s: [%s] %s", result.Processor, result.Ignored.Error.Type, result.Ignored.Error.Reason)
continue
case "error":
failed = true
errs = append(errs, fmt.Errorf("error in processor %s: %w", result.Processor, result.Error))
case "failed":
failed = true
errs = append(errs, fmt.Errorf("%q processor failed", result.Processor))
default:
errs = append(errs, fmt.Errorf("unexpected result status %q for processor %q", result.Status, result.Processor))
}
}

if !failed {
processedEvents[i] = source
}
}
return processedEvents, nil

return processedEvents, errors.Join(errs...)
}

func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
Expand Down