Skip to content

Commit

Permalink
start of filebeatreceiver integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Oct 2, 2024
1 parent 6138d59 commit 8763c69
Showing 1 changed file with 118 additions and 10 deletions.
128 changes: 118 additions & 10 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"testing"
"text/template"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestOtelFileProcessing(t *testing.T) {
// otel mode should be detected automatically
tempDir := t.TempDir()
cfgFilePath := filepath.Join(tempDir, "otel.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestOtelFileProcessing(t *testing.T) {

func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesting.Fixture, tempDir string) {
cfgFilePath := filepath.Join(tempDir, "otel-valid.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))

// check `elastic-agent otel validate` command works for otel config
cmd, err := fixture.PrepareAgentCommand(ctx, []string{"otel", "validate", "--config", cfgFilePath})
Expand All @@ -220,7 +221,7 @@ func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesti

// check `elastic-agent otel validate` command works for invalid otel config
cfgFilePath = filepath.Join(tempDir, "otel-invalid.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0o600))

out, err = fixture.Exec(ctx, []string{"otel", "validate", "--config", cfgFilePath})
require.Error(t, err)
Expand Down Expand Up @@ -297,7 +298,7 @@ func TestOtelLogsIngestion(t *testing.T) {
logsIngestionConfig = strings.ReplaceAll(logsIngestionConfig, "{{.TestId}}", testId)

cfgFilePath := filepath.Join(tempDir, "otel.yml")
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand Down Expand Up @@ -337,7 +338,7 @@ func TestOtelLogsIngestion(t *testing.T) {

// Write logs to input file.
logsCount := 10_000
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0600)
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0o600)
require.NoError(t, err)
for i := 0; i < logsCount; i++ {
_, err = fmt.Fprintf(inputFile, "This is a test log message %d\n", i+1)
Expand Down Expand Up @@ -394,8 +395,8 @@ func TestOtelAPMIngestion(t *testing.T) {
cfgFilePath := filepath.Join(tempDir, "otel.yml")
fileName := "content.log"
apmConfig := fmt.Sprintf(apmOtelConfig, filepath.Join(tempDir, fileName), testId)
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0600))
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0o600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0o600))

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
require.NoError(t, err)
Expand Down Expand Up @@ -479,7 +480,7 @@ func TestOtelAPMIngestion(t *testing.T) {
err,
)

require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0600))
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0o600))

// check index
var hits int
Expand All @@ -489,10 +490,12 @@ func TestOtelAPMIngestion(t *testing.T) {

// apm mismatch or proper docs in ES

watchLines := linesTrackMap([]string{"This is a test error message",
watchLines := linesTrackMap([]string{
"This is a test error message",
"This is a test debug message 2",
"This is a test debug message 3",
"This is a test debug message 4"})
"This is a test debug message 4",
})

// failed to get APM version mismatch in time
// processing should be running
Expand Down Expand Up @@ -615,3 +618,108 @@ func testAgentCanRun(ctx context.Context, t *testing.T, fixture *atesting.Fixtur
)
}
}

func TestFileBeatReceiver(t *testing.T) {
define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
// {Type: define.Windows}, we don't support otel on Windows yet
{Type: define.Linux},
{Type: define.Darwin},
},
})

type otelConfigOptions struct {
Message string
Output string
HomeDir string
}
testMessage := "supercalifragilisticexpialidocious"
tmpDir := t.TempDir()
exporterOutputPath := filepath.Join(tmpDir, "output.json")
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(exporterOutputPath)
if err != nil {
t.Logf("No exporter output file")
return
}
t.Logf("Contents of exporter output file:\n%s\n", string(contents))
}
})
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
inputs:
- type: benchmark
enabled: true
count: 1
message: {{.Message}}
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
path.home: {{.HomeDir}}
exporters:
file/no_rotation:
path: {{.Output}}
service:
pipelines:
logs:
receivers: [filebeatreceiver]
exporters: [file/no_rotation]
`

var otelConfigBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer,
otelConfigOptions{
Message: testMessage,
Output: exporterOutputPath,
HomeDir: tmpDir,
}))
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(otelConfigPath)
if err != nil {
t.Logf("no otel config file")
return
}
t.Logf("Contents of otel config file:\n%s\n", string(contents))
}
})
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()
err = fixture.Prepare(ctx, fakeComponent)
require.NoError(t, err)

var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
go func() {
defer fixtureWg.Done()
err = fixture.RunOtelWithClient(ctx, false, false)
}()

require.Eventually(t,
func() bool {
content, err := os.ReadFile(exporterOutputPath)
if err != nil || len(content) == 0 {
return false
}
return bytes.Contains(content, []byte(testMessage))
},
3*time.Minute, 1*time.Second,
fmt.Sprintf("there should be exported logs by now"))

cancel()
fixtureWg.Wait()
require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error())
}

0 comments on commit 8763c69

Please sign in to comment.