Skip to content

Commit

Permalink
Run light metricset processors when standardizing events on integrati…
Browse files Browse the repository at this point in the history
…on tests (#22854) (#23061)

(cherry picked from commit 610598d)
  • Loading branch information
jsoriano authored Dec 10, 2020
1 parent 2f1f269 commit 9cfcb09
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 0 deletions.
13 changes: 13 additions & 0 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ func StandardizeEvent(ms mb.MetricSet, e mb.Event, modifiers ...mb.EventModifier

fullEvent := e.BeatEvent(ms.Module().Name(), ms.Name(), modifiers...)

// Run processors if defined for the metricset, it can happen for light metricsets
// with processors in the manifest.
processors, err := mb.Registry.ProcessorsForMetricSet(ms.Module().Name(), ms.Name())
if err == nil && processors != nil {
enriched, err := processors.Run(&fullEvent)
if err != nil {
panic(err)
}
if enriched != nil {
fullEvent = *enriched
}
}

return fullEvent
}

Expand Down
139 changes: 139 additions & 0 deletions metricbeat/mb/testing/lightmodules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package testing

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"

// Processor in the light module
_ "github.com/elastic/beats/v7/libbeat/processors/actions"

// Input used in the light module
_ "github.com/elastic/beats/v7/metricbeat/module/http/json"
)

func init() {
// To be moved to some kind of helper
os.Setenv("BEAT_STRICT_PERMS", "false")
mb.Registry.SetSecondarySource(mb.NewLightModulesSource("./testdata/lightmodules"))
}

func TestFetchLightModuleWithProcessors(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
fmt.Fprintln(w, `{"foo":"bar"}`)
}))
defer ts.Close()

config := map[string]interface{}{
"module": "test",
"metricsets": []string{"json"},
"hosts": []string{ts.URL},
"namespace": "test",
}
ms := NewFetcher(t, config)
events, errs := ms.FetchEvents()
assert.Empty(t, errs)
assert.NotEmpty(t, events)

expected := common.MapStr{
"http": common.MapStr{
"test": common.MapStr{
"foo": "bar",
},
},
"service": common.MapStr{
"type": "test",
},

// From the processor in the light module
"fields": common.MapStr{
"test": "fromprocessor",
},
}
event := StandardizeEvent(ms.(mb.MetricSet), events[0])
assert.EqualValues(t, expected, event.Fields)
}

func TestDataLightModuleWithProcessors(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
fmt.Fprintln(w, `{"foo":"bar"}`)
}))
defer ts.Close()

config := map[string]interface{}{
"module": "test",
"metricsets": []string{"json"},
"hosts": []string{ts.URL},
"namespace": "test",
}
ms := NewFetcher(t, config)
events, errs := ms.FetchEvents()
assert.Empty(t, errs)
assert.NotEmpty(t, events)

dir, err := ioutil.TempDir("", "_meta-*")
require.NoError(t, err)
defer os.RemoveAll(dir)

dataPath := filepath.Join(dir, "data.json")

ms.WriteEvents(t, dataPath)

var event struct {
Event struct {
Dataset string `json:"dataset"`
} `json:"event"`
Http struct {
Test struct {
Foo string `json:"foo"`
} `json:"test"`
} `json:"http"`

// From the processor in the light module
Fields struct {
Test string `json:"test"`
}
}

d, err := ioutil.ReadFile(dataPath)
require.NoError(t, err)

err = json.Unmarshal(d, &event)
require.NoError(t, err)

assert.Equal(t, "http.test", event.Event.Dataset)
assert.Equal(t, "bar", event.Http.Test.Foo)
assert.Equal(t, "fromprocessor", event.Fields.Test)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
default: true
input:
module: http
metricset: json
processors:
- add_fields:
fields:
test: "fromprocessor"
3 changes: 3 additions & 0 deletions metricbeat/mb/testing/testdata/lightmodules/test/module.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: test
metricsets:
- json
59 changes: 59 additions & 0 deletions metricbeat/tests/system/test_lightmodules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import http.server
import metricbeat
import os
import os.path
import shutil
import sys
import threading
import unittest
import json

from contextlib import contextmanager


class Test(metricbeat.BaseTest):
def test_processors(self):
shutil.copytree(
os.path.join(self.beat_path, "mb/testing/testdata/lightmodules"),
os.path.join(self.working_dir, "module"),
)

with http_test_server() as server:
self.render_config_template(modules=[{
"name": "test",
"metricsets": ["json"],
"namespace": "test",
# Hard-coding 'localhost' because hostname in server.server_name doesn't always work.
"hosts": [f"localhost:{server.server_port}"],
}])

proc = self.start_beat()

self.wait_until(lambda: self.output_lines() > 0)
proc.check_kill_and_wait()

self.assert_no_logged_warnings()

output = self.read_output_json()
self.assertGreater(len(output), 0)

for evt in output:
self.assertEqual(evt["fields"]["test"], "fromprocessor")


@contextmanager
def http_test_server():
server = http.server.HTTPServer(('localhost', 0), TestHTTPHandler)
child = threading.Thread(target=server.serve_forever)
child.start()
yield server
server.shutdown()
child.join()


class TestHTTPHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"foo": "bar"}).encode("utf-8"))

0 comments on commit 9cfcb09

Please sign in to comment.