diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1a88c68cc885..ca4837a0e934 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add experimental Elasticsearch index metricset. {pull}6881[6881] - Add dashboards and visualizations for haproxy metrics. {pull}6934[6934] - Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606] +- Add exchanges metricset to the RabbitMQ module {issue}6442[6442] {pull}6607[6607] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index a03c600a9b83..56dab7aba6ec 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -10912,6 +10912,113 @@ type: long Number of octets received on the connection. +-- + +[float] +== exchange fields + +exchange + + + +*`rabbitmq.exchange.name`*:: ++ +-- +type: keyword + +The name of the queue with non-ASCII characters escaped as in C. + + +-- + +*`rabbitmq.exchange.vhost`*:: ++ +-- +type: keyword + +Virtual host name with non-ASCII characters escaped as in C. + + +-- + +*`rabbitmq.exchange.durable`*:: ++ +-- +type: boolean + +Whether or not the queue survives server restarts. + + +-- + +*`rabbitmq.exchange.auto_delete`*:: ++ +-- +type: boolean + +Whether the queue will be deleted automatically when no longer used. + + +-- + +*`rabbitmq.exchange.internal`*:: ++ +-- +type: boolean + +Whether the exchange is internal, i.e. cannot be directly published to by a client. + + +-- + +*`rabbitmq.exchange.user`*:: ++ +-- +type: keyword + +User who created the exchange. + + +-- + +*`rabbitmq.exchange.messages.publish_in.count`*:: ++ +-- +type: long + +Count of messages published "in" to an exchange, i.e. not taking account of routing. + + +-- + +*`rabbitmq.exchange.messages.publish_in.details.rate`*:: ++ +-- +type: float + +How much the exchange publish-in count has changed per second in the most recent sampling interval. + + +-- + +*`rabbitmq.exchange.messages.publish_out.count`*:: ++ +-- +type: long + +Count of messages published "out" of an exchange, i.e. taking account of routing. + + +-- + +*`rabbitmq.exchange.messages.publish_out.details.rate`*:: ++ +-- +type: float + +How much the exchange publish-out count has changed per second in the most recent sampling interval. + + -- [float] diff --git a/metricbeat/docs/modules/rabbitmq.asciidoc b/metricbeat/docs/modules/rabbitmq.asciidoc index 761a48ed0adc..6d19884154b5 100644 --- a/metricbeat/docs/modules/rabbitmq.asciidoc +++ b/metricbeat/docs/modules/rabbitmq.asciidoc @@ -40,12 +40,16 @@ The following metricsets are available: * <> +* <> + * <> * <> include::rabbitmq/connection.asciidoc[] +include::rabbitmq/exchange.asciidoc[] + include::rabbitmq/node.asciidoc[] include::rabbitmq/queue.asciidoc[] diff --git a/metricbeat/docs/modules/rabbitmq/exchange.asciidoc b/metricbeat/docs/modules/rabbitmq/exchange.asciidoc new file mode 100644 index 000000000000..b1a84ef6186f --- /dev/null +++ b/metricbeat/docs/modules/rabbitmq/exchange.asciidoc @@ -0,0 +1,23 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-rabbitmq-exchange]] +=== RabbitMQ exchange metricset + +beta[] + +include::../../../module/rabbitmq/exchange/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/rabbitmq/exchange/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index dc5564f3890f..f178cd9f2c31 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -98,7 +98,8 @@ This file is generated! See scripts/docs_collector.py .2+| .2+| |<> beta[] |<> beta[] |<> beta[] |image:./images/icon-yes.png[Prebuilt dashboards are available] | -.3+| .3+| |<> beta[] +.4+| .4+| |<> beta[] +|<> beta[] |<> beta[] |<> beta[] |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index 63ca244bca99..db04d72c0dac 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -103,6 +103,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/prometheus/stats" _ "github.com/elastic/beats/metricbeat/module/rabbitmq" _ "github.com/elastic/beats/metricbeat/module/rabbitmq/connection" + _ "github.com/elastic/beats/metricbeat/module/rabbitmq/exchange" _ "github.com/elastic/beats/metricbeat/module/rabbitmq/node" _ "github.com/elastic/beats/metricbeat/module/rabbitmq/queue" _ "github.com/elastic/beats/metricbeat/module/redis" diff --git a/metricbeat/module/rabbitmq/_meta/Dockerfile b/metricbeat/module/rabbitmq/_meta/Dockerfile index 21a28e2a78e3..10c10faa8bc4 100644 --- a/metricbeat/module/rabbitmq/_meta/Dockerfile +++ b/metricbeat/module/rabbitmq/_meta/Dockerfile @@ -1,4 +1,4 @@ -FROM rabbitmq:3-management +FROM rabbitmq:3.7.4-management RUN apt-get update && apt-get install -y netcat && apt-get clean HEALTHCHECK --interval=1s --retries=90 CMD nc -w 1 -v 127.0.0.1 15672 + exchange + release: beta + fields: + - name: name + type: keyword + description: > + The name of the queue with non-ASCII characters escaped as in C. + - name: vhost + type: keyword + description: > + Virtual host name with non-ASCII characters escaped as in C. + - name: durable + type: boolean + description: > + Whether or not the queue survives server restarts. + - name: auto_delete + type: boolean + description: > + Whether the queue will be deleted automatically when no longer used. + - name: internal + type: boolean + description: > + Whether the exchange is internal, i.e. cannot be directly published to by a client. + - name: user + type: keyword + description: > + User who created the exchange. + + - name: messages.publish_in.count + type: long + description: > + Count of messages published "in" to an exchange, i.e. not taking account of routing. + - name: messages.publish_in.details.rate + type: float + description: > + How much the exchange publish-in count has changed per second in the most recent sampling interval. + - name: messages.publish_out.count + type: long + description: > + Count of messages published "out" of an exchange, i.e. taking account of routing. + - name: messages.publish_out.details.rate + type: float + description: > + How much the exchange publish-out count has changed per second in the most recent sampling interval. diff --git a/metricbeat/module/rabbitmq/exchange/data.go b/metricbeat/module/rabbitmq/exchange/data.go new file mode 100644 index 000000000000..beaabf788d43 --- /dev/null +++ b/metricbeat/module/rabbitmq/exchange/data.go @@ -0,0 +1,61 @@ +package exchange + +import ( + "encoding/json" + + "github.com/elastic/beats/libbeat/common" + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/libbeat/logp" +) + +var ( + schema = s.Schema{ + "name": c.Str("name"), + "vhost": c.Str("vhost"), + "type": c.Str("type"), + "durable": c.Bool("durable"), + "auto_delete": c.Bool("auto_delete"), + "internal": c.Bool("internal"), + "arguments": c.Dict("arguments", s.Schema{}), + "user": c.Str("user_who_performed_action", s.Optional), + "messages": c.Dict("message_stats", s.Schema{ + "publish_in": s.Object{ + "count": c.Int("publish_in", s.Optional), + "details": c.Dict("publish_in_details", s.Schema{ + "rate": c.Float("rate"), + }, c.DictOptional), + }, + "publish_out": s.Object{ + "count": c.Int("publish_out", s.Optional), + "details": c.Dict("publish_out_details", s.Schema{ + "rate": c.Float("rate"), + }, c.DictOptional), + }, + }, c.DictOptional), + } +) + +func eventsMapping(content []byte) ([]common.MapStr, error) { + var exchanges []map[string]interface{} + err := json.Unmarshal(content, &exchanges) + if err != nil { + logp.Err("Error: ", err) + return nil, err + } + + events := []common.MapStr{} + errors := s.NewErrors() + + for _, exchange := range exchanges { + event, errs := eventMapping(exchange) + events = append(events, event) + errors.AddErrors(errs) + } + + return events, errors +} + +func eventMapping(exchange map[string]interface{}) (common.MapStr, *s.Errors) { + return schema.Apply(exchange) +} diff --git a/metricbeat/module/rabbitmq/exchange/exchange.go b/metricbeat/module/rabbitmq/exchange/exchange.go new file mode 100644 index 000000000000..aa895d400fd3 --- /dev/null +++ b/metricbeat/module/rabbitmq/exchange/exchange.go @@ -0,0 +1,71 @@ +package exchange + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" + defaultPath = "/api/exchanges" +) + +var ( + hostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + DefaultPath: defaultPath, + }.Build() +) + +// init registers the MetricSet with the central registry as soon as the program +// starts. The New function will be called later to instantiate an instance of +// the MetricSet for each host defined in the module's configuration. After the +// MetricSet has been created then Fetch will begin to be called periodically. +func init() { + if err := mb.Registry.AddMetricSet("rabbitmq", "exchange", New, hostParser); err != nil { + panic(err) + } +} + +// MetricSet holds any configuration or state information. It must implement +// the mb.MetricSet interface. And this is best achieved by embedding +// mb.BaseMetricSet because it implements all of the required mb.MetricSet +// interface methods except for Fetch. +type MetricSet struct { + mb.BaseMetricSet + *helper.HTTP +} + +// New creates a new instance of the MetricSet. New is responsible for unpacking +// any MetricSet specific configuration options if there are any. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Beta("The rabbitmq exchange metricset is beta") + + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + http.SetHeader("Accept", "application/json") + + return &MetricSet{ + base, + http, + }, nil +} + +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + content, err := m.HTTP.FetchContent() + + if err != nil { + return nil, err + } + + events, _ := eventsMapping(content) + return events, nil +} diff --git a/metricbeat/module/rabbitmq/exchange/exchange_integration_test.go b/metricbeat/module/rabbitmq/exchange/exchange_integration_test.go new file mode 100644 index 000000000000..18ae219885d5 --- /dev/null +++ b/metricbeat/module/rabbitmq/exchange/exchange_integration_test.go @@ -0,0 +1,65 @@ +package exchange + +import ( + "fmt" + "os" + "testing" + + "github.com/elastic/beats/libbeat/common" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewEventsFetcher(t, getConfig()) + err := mbtest.WriteEventsCond(f, t, func(e common.MapStr) bool { + hasIn, _ := e.HasKey("messages.publish_in") + hasOut, _ := e.HasKey("messages.publish_out") + return hasIn && hasOut + }) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"exchange"}, + "hosts": getTestRabbitMQHost(), + "username": getTestRabbitMQUsername(), + "password": getTestRabbitMQPassword(), + } +} + +const ( + rabbitmqDefaultHost = "localhost" + rabbitmqDefaultPort = "15672" + rabbitmqDefaultUsername = "guest" + rabbitmqDefaultPassword = "guest" +) + +func getTestRabbitMQHost() string { + return fmt.Sprintf("%v:%v", + getenv("RABBITMQ_HOST", rabbitmqDefaultHost), + getenv("RABBITMQ_PORT", rabbitmqDefaultPort), + ) +} + +func getTestRabbitMQUsername() string { + return getenv("RABBITMQ_USERNAME", rabbitmqDefaultUsername) +} + +func getTestRabbitMQPassword() string { + return getenv("RABBITMQ_PASSWORD", rabbitmqDefaultPassword) +} + +func getenv(name, defaultValue string) string { + return strDefault(os.Getenv(name), defaultValue) +} + +func strDefault(a, defaults string) string { + if len(a) == 0 { + return defaults + } + return a +} diff --git a/metricbeat/module/rabbitmq/exchange/exchange_test.go b/metricbeat/module/rabbitmq/exchange/exchange_test.go new file mode 100644 index 000000000000..994710127bed --- /dev/null +++ b/metricbeat/module/rabbitmq/exchange/exchange_test.go @@ -0,0 +1,60 @@ +package exchange + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/elastic/beats/libbeat/common" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + + "github.com/stretchr/testify/assert" +) + +func TestFetchEventContents(t *testing.T) { + absPath, err := filepath.Abs("../_meta/testdata/") + + response, err := ioutil.ReadFile(absPath + "/exchange_sample_response.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"exchange"}, + "hosts": []string{server.URL}, + } + + f := mbtest.NewEventsFetcher(t, config) + events, err := f.Fetch() + event := events[0] + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) + + messagesExpected := common.MapStr{ + "publish_in": common.MapStr{ + "count": int64(100), + "details": common.MapStr{"rate": float64(0.5)}, + }, + "publish_out": common.MapStr{ + "count": int64(99), + "details": common.MapStr{"rate": float64(0.9)}, + }, + } + + assert.Equal(t, "exchange.name", event["name"]) + assert.Equal(t, "guest", event["user"]) + assert.Equal(t, "/", event["vhost"]) + assert.Equal(t, true, event["durable"]) + assert.Equal(t, false, event["auto_delete"]) + assert.Equal(t, false, event["internal"]) + assert.Equal(t, messagesExpected, event["messages"]) +} diff --git a/metricbeat/module/rabbitmq/node/data.go b/metricbeat/module/rabbitmq/node/data.go index 45df9c62b72c..a398bba528ae 100644 --- a/metricbeat/module/rabbitmq/node/data.go +++ b/metricbeat/module/rabbitmq/node/data.go @@ -134,6 +134,7 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { err := json.Unmarshal(content, &nodes) if err != nil { logp.Err("Error: ", err) + return nil, err } events := []common.MapStr{} diff --git a/metricbeat/module/rabbitmq/queue/data.go b/metricbeat/module/rabbitmq/queue/data.go index 51986cebc404..bae0122e333d 100644 --- a/metricbeat/module/rabbitmq/queue/data.go +++ b/metricbeat/module/rabbitmq/queue/data.go @@ -69,6 +69,7 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { err := json.Unmarshal(content, &queues) if err != nil { logp.Err("Error: ", err) + return nil, err } events := []common.MapStr{}