Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Log monitoring bulk failures (#14356) #14526

Merged
merged 2 commits into from
Nov 21, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Log monitoring bulk failures (#14356)
* Log monitoring bulk failures

* Renaming function

* Simplifying type

* Removing extraneous second value

* Adding godoc comments

* Adding CHANGELOG entry

* Clarifying log messages

* WIP: adding unit test stubs

* Fleshing out unit tests
  • Loading branch information
ycombinator committed Nov 20, 2019

Verified

This commit was signed with the committer’s verified signature. The key has expired.
andrewkroh Andrew Kroh
commit c5fcf1b2967962e227fe3746f69e4480cefe56c8
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -254,6 +254,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix checking tagsFilter using length in cloudwatch metricset. {pull}14525[14525]
- Fixed bug with `elasticsearch/cluster_stats` metricset not recording license expiration date correctly. {issue}14541[14541] {pull}14591[14591]
- Fixed bug with `elasticsearch/cluster_stats` metricset not recording license ID in the correct field. {pull}14592[14592]
- Vshpere module splits `virtualmachine.host` into `virtualmachine.host.id` and `virtualmachine.host.hostname`. {issue}7187[7187] {pull}7213[7213]
- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356]

*Packetbeat*

31 changes: 30 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
@@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
if err != nil {
return err
}

logBulkFailures(result, []report.Event{document})
return err
}

@@ -238,3 +244,26 @@ func getMonitoringIndexName() string {
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
29 changes: 10 additions & 19 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
@@ -34,16 +35,15 @@ type bulkRequest struct {
requ *http.Request
}

type bulkResult struct {
raw []byte
}
// BulkResult contains the result of a bulk API request.
type BulkResult json.RawMessage

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
index, docType string,
params map[string]string, body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
return conn.BulkWith(index, docType, params, nil, body)
}

@@ -56,7 +56,7 @@ func (conn *Connection) BulkWith(
params map[string]string,
metaBuilder MetaBuilder,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
@@ -76,7 +76,7 @@ func (conn *Connection) BulkWith(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
@@ -85,7 +85,7 @@ func (conn *Connection) BulkWith(
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
@@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

func newBulkRequest(
@@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) {
body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
8 changes: 2 additions & 6 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"os"
@@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true})
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

ops := []map[string]interface{}{
{
@@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Bulk(index, "type1", params, body)
_, err := client.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Bulk() fails: %s", resp)
}
}

func TestOneHost500Resp_Bulk(t *testing.T) {
82 changes: 45 additions & 37 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ type Client struct {
bulkRequ *bulkRequest

// buffered json response reader
json jsonReader
json JSONReader

// additional configs
compressionLevel int
@@ -128,6 +128,7 @@ var (
)

var (
errExpectedItemsArray = errors.New("expected items array")
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
@@ -360,7 +361,7 @@ func (client *Client) publishEvents(
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
client.json.init(result)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

@@ -478,46 +479,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
reader *JSONReader,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk response: expected JSON object")
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil, bulkResultStats{}
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk response: expected items array")
if err := BulkReadToItems(reader); err != nil {
logp.Err("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
status, msg, err := BulkReadItemStatus(reader)
if err != nil {
return nil, bulkResultStats{}
}
@@ -553,9 +527,43 @@ func bulkCollectPublishFails(
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
// BulkReadToItems reads the bulk response up to (but not including) items
func BulkReadToItems(reader *JSONReader) error {
if err := reader.ExpectDict(); err != nil {
return errExpectedObject
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
return err
}

if kind == dictEnd {
return errExpectedItemsArray
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.ExpectArray(); err != nil {
return errExpectedItemsArray
}

return nil
}

// BulkReadItemStatus reads the status and error fields from the bulk item
func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) {
// skip outer dictionary
if err := reader.expectDict(); err != nil {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

@@ -593,8 +601,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) {
return status, msg, nil
}

func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
func itemStatusInner(reader *JSONReader) (int, []byte, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Loading