diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index dbf9f287858f..a38f43ed6d4d 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -3,21 +3,12 @@ package elasticsearch import ( "bytes" "encoding/json" - - "github.com/elastic/beats/libbeat/logp" + "io" + "io/ioutil" + "net/http" + "strings" ) -// MetaBuilder creates meta data for bulk requests -type MetaBuilder func(interface{}) interface{} - -type bulkRequest struct { - es *Connection - buf bytes.Buffer - enc *json.Encoder - path string - params map[string]string -} - type bulkMeta struct { Index bulkMetaIndex `json:"index"` } @@ -27,38 +18,23 @@ type bulkMetaIndex struct { DocType string `json:"_type"` } -type BulkResult struct { - raw []byte - // Items []json.RawMessage `json:"items"` -} - -func (r *bulkRequest) Send(meta, obj interface{}) error { - var err error +// MetaBuilder creates meta data for bulk requests +type MetaBuilder func(interface{}) interface{} - pos := r.buf.Len() - if err = r.enc.Encode(meta); err != nil { - return err - } - if err = r.enc.Encode(obj); err != nil { - r.buf.Truncate(pos) // remove meta object from buffer - } - return err +type bulkRequest struct { + requ *http.Request } -func (r *bulkRequest) Flush() (int, BulkResult, error) { - if r.buf.Len() == 0 { - logp.Debug("elasticsearch", "Empty channel. Wait for more data.") - return 0, BulkResult{}, nil - } +type bulkBody interface { + Reader() io.Reader +} - status, resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf) - if err != nil { - return status, BulkResult{}, err - } - r.buf.Truncate(0) +type bulkResult struct { + raw []byte +} - result, err := readBulkResult(resp) - return status, result, err +type jsonBulkBody struct { + buf *bytes.Buffer } // Bulk performs many index/delete operations in a single API call. @@ -81,86 +57,138 @@ func (conn *Connection) BulkWith( body []interface{}, ) (*QueryResult, error) { if len(body) == 0 { - logp.Debug("elasticsearch", "Empty channel. Wait for more data.") return nil, nil } - path, err := makePath(index, docType, "_bulk") - if err != nil { + bulkBody := newJSONBulkBody(nil) + if err := bulkEncode(bulkBody, metaBuilder, body); err != nil { return nil, err } - buf := bulkEncode(metaBuilder, body) - if buf.Len() == 0 { - logp.Debug("elasticsearch", "Empty channel. Wait for more data.") - return nil, nil + requ, err := newBulkRequest(conn.URL, index, docType, params, bulkBody) + if err != nil { + return nil, err } - _, resp, err := conn.sendBulkRequest("POST", path, params, &buf) + _, result, err := conn.sendBulkRequest(requ) if err != nil { return nil, err } - return readQueryResult(resp) + return readQueryResult(result.raw) } -func (conn *Connection) startBulkRequest( - index string, - docType string, +func newBulkRequest( + urlStr string, + index, docType string, params map[string]string, + body bulkBody, ) (*bulkRequest, error) { path, err := makePath(index, docType, "_bulk") if err != nil { return nil, err } - r := &bulkRequest{ - es: conn, - path: path, - params: params, + url := makeURL(urlStr, path, params) + + var reader io.Reader + if body != nil { + reader = body.Reader() + } + + requ, err := http.NewRequest("POST", url, reader) + if err != nil { + return nil, err + } + + return &bulkRequest{ + requ: requ, + }, nil +} + +func (r *bulkRequest) Reset(body bulkBody) { + bdy := body.Reader() + + rc, ok := bdy.(io.ReadCloser) + if !ok && body != nil { + rc = ioutil.NopCloser(bdy) + } + + switch v := bdy.(type) { + case *bytes.Buffer: + r.requ.ContentLength = int64(v.Len()) + case *bytes.Reader: + r.requ.ContentLength = int64(v.Len()) + case *strings.Reader: + r.requ.ContentLength = int64(v.Len()) } - r.enc = json.NewEncoder(&r.buf) - return r, nil + + r.requ.Header = http.Header{} + r.requ.Body = rc } -func (conn *Connection) sendBulkRequest( - method, path string, - params map[string]string, - buf *bytes.Buffer, -) (int, []byte, error) { - url := makeURL(conn.URL, path, params) - logp.Debug("elasticsearch", "Sending bulk request to %s", url) +func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) { + status, resp, err := conn.execHTTPRequest(requ.requ) + if err != nil { + return status, bulkResult{}, err + } - return conn.execRequest(method, url, buf) + result, err := readBulkResult(resp) + return status, result, err +} + +func readBulkResult(obj []byte) (bulkResult, error) { + return bulkResult{obj}, nil } -func bulkEncode(metaBuilder MetaBuilder, body []interface{}) bytes.Buffer { - var buf bytes.Buffer - enc := json.NewEncoder(&buf) +func newJSONBulkBody(buf *bytes.Buffer) *jsonBulkBody { + if buf == nil { + buf = bytes.NewBuffer(nil) + } + return &jsonBulkBody{buf} +} + +func (b *jsonBulkBody) Reset() { + b.buf.Reset() +} + +func (b *jsonBulkBody) Reader() io.Reader { + return b.buf +} + +func (b *jsonBulkBody) AddRaw(raw interface{}) error { + enc := json.NewEncoder(b.buf) + return enc.Encode(raw) +} + +func (b *jsonBulkBody) Add(meta, obj interface{}) error { + enc := json.NewEncoder(b.buf) + pos := b.buf.Len() + + if err := enc.Encode(meta); err != nil { + b.buf.Truncate(pos) + return err + } + if err := enc.Encode(obj); err != nil { + b.buf.Truncate(pos) + return err + } + return nil +} +func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) error { if metaBuilder == nil { for _, obj := range body { - pos := buf.Len() - if err := enc.Encode(obj); err != nil { + if err := out.AddRaw(obj); err != nil { debug("Failed to encode message: %s", err) - buf.Truncate(pos) + return err } } } else { for _, obj := range body { - pos := buf.Len() meta := metaBuilder(obj) - err := enc.Encode(meta) - if err == nil { - err = enc.Encode(obj) - } - if err != nil { + if err := out.Add(meta, obj); err != nil { debug("Failed to encode message: %s", err) - buf.Truncate(pos) } } } - return buf -} - -func readBulkResult(obj []byte) (BulkResult, error) { - return BulkResult{obj}, nil + return nil } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 7ab0002518c8..5c3eae2e97bc 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -13,8 +13,7 @@ import ( "net/url" "time" - humanize "github.com/dustin/go-humanize" - + "github.com/dustin/go-humanize" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/mode" @@ -26,6 +25,11 @@ type Client struct { index string params map[string]string + // buffered bulk requests + bulkBody *jsonBulkBody + bulkRequ *bulkRequest + + // buffered json response reader json jsonReader } @@ -88,6 +92,11 @@ func NewClient( WriteErrors: statWriteErrors, }) + bulkRequ, err := newBulkRequest(esURL, "", "", params, nil) + if err != nil { + logp.Critical("Elasticsearch output not correctly initialized: %v", err) + } + client := &Client{ Connection: Connection{ URL: esURL, @@ -104,6 +113,9 @@ func NewClient( }, index: index, params: params, + + bulkBody: newJSONBulkBody(nil), + bulkRequ: bulkRequ, } client.Connection.onConnectCallback = func() error { @@ -137,48 +149,57 @@ func (client *Client) Clone() *Client { func (client *Client) PublishEvents( events []common.MapStr, ) ([]common.MapStr, error) { - begin := time.Now() publishEventsCallCount.Add(1) + if len(events) == 0 { + return nil, nil + } + if !client.connected { return events, ErrNotConnected } - // new request to store all events into - request, err := client.startBulkRequest("", "", client.params) - if err != nil { - logp.Err("Failed to perform any bulk index operations: %s", err) - return events, err - } + body := client.bulkBody + body.Reset() + + requ := client.bulkRequ + requ.Reset(body) // encode events into bulk request buffer, dropping failed elements from // events slice - events = bulkEncodePublishRequest(request, client.index, events) + events = bulkEncodePublishRequest(body, client.index, events) if len(events) == 0 { return nil, nil } - // send bulk request - bufferSize := request.buf.Len() - _, res, err := request.Flush() - if err != nil { - logp.Err("Failed to perform any bulk index operations: %s", err) - return events, err + status, result, sendErr := client.sendBulkRequest(requ) + if sendErr != nil { + logp.Err("Failed to perform any bulk index operations: %s", sendErr) + return events, sendErr } logp.Debug("elasticsearch", "PublishEvents: %d metrics have been packed into a buffer of %s and published to elasticsearch in %v.", len(events), - humanize.Bytes(uint64(bufferSize)), + humanize.Bytes(uint64(body.buf.Len())), time.Now().Sub(begin)) // check response for transient errors - client.json.init(res.raw) - failed_events := bulkCollectPublishFails(&client.json, events) - ackedEvents.Add(int64(len(events) - len(failed_events))) - eventsNotAcked.Add(int64(len(failed_events))) - if len(failed_events) > 0 { - return failed_events, mode.ErrTempBulkFailure + var failedEvents []common.MapStr + if status != 200 { + failedEvents = events + } else { + client.json.init(result.raw) + failedEvents = bulkCollectPublishFails(&client.json, events) + } + + ackedEvents.Add(int64(len(events) - len(failedEvents))) + eventsNotAcked.Add(int64(len(failedEvents))) + if len(failedEvents) > 0 { + if sendErr == nil { + sendErr = mode.ErrTempBulkFailure + } + return failedEvents, sendErr } return nil, nil @@ -187,14 +208,14 @@ func (client *Client) PublishEvents( // fillBulkRequest encodes all bulk requests and returns slice of events // successfully added to bulk request. func bulkEncodePublishRequest( - requ *bulkRequest, + body *jsonBulkBody, index string, events []common.MapStr, ) []common.MapStr { okEvents := events[:0] for _, event := range events { meta := eventBulkMeta(index, event) - err := requ.Send(meta, event) + err := body.Add(meta, event) if err != nil { logp.Err("Failed to encode event: %s", err) continue @@ -206,7 +227,6 @@ func bulkEncodePublishRequest( } func eventBulkMeta(index string, event common.MapStr) bulkMeta { - index = getIndex(event, index) meta := bulkMeta{ Index: bulkMetaIndex{ @@ -520,7 +540,10 @@ func (conn *Connection) execRequest( logp.Warn("Failed to create request", err) return 0, nil, err } + return conn.execHTTPRequest(req) +} +func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) { req.Header.Add("Accept", "application/json") if conn.Username != "" || conn.Password != "" { req.SetBasicAuth(conn.Username, conn.Password) diff --git a/libbeat/outputs/elasticsearch/json_read.go b/libbeat/outputs/elasticsearch/json_read.go index 3b40f1e42edc..97ed18a80b25 100644 --- a/libbeat/outputs/elasticsearch/json_read.go +++ b/libbeat/outputs/elasticsearch/json_read.go @@ -57,7 +57,6 @@ const ( nullValue dictStart dictEnd - dictField arrStart arrEnd stringEntity