Skip to content

Commit

Permalink
Combine separate es bulk api implementations (#1834)
Browse files Browse the repository at this point in the history
* Combine separate es bulk api implementations

Until now (*Connection).Bulk and (*Client).PublishEvents seem to be based on
different implementations for dealing with bulk API. Only the former being
tested by unit and integration tests. This refactoring removes duplicate logic.
  • Loading branch information
Steffen Siering authored and ruflin committed Jun 14, 2016
1 parent d48d65e commit 09328f4
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 111 deletions.
196 changes: 112 additions & 84 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,12 @@ package elasticsearch
import (
"bytes"
"encoding/json"

"github.com/elastic/beats/libbeat/logp"
"io"
"io/ioutil"
"net/http"
"strings"
)

// MetaBuilder creates meta data for bulk requests
type MetaBuilder func(interface{}) interface{}

type bulkRequest struct {
es *Connection
buf bytes.Buffer
enc *json.Encoder
path string
params map[string]string
}

type bulkMeta struct {
Index bulkMetaIndex `json:"index"`
}
Expand All @@ -27,38 +18,23 @@ type bulkMetaIndex struct {
DocType string `json:"_type"`
}

type BulkResult struct {
raw []byte
// Items []json.RawMessage `json:"items"`
}

func (r *bulkRequest) Send(meta, obj interface{}) error {
var err error
// MetaBuilder creates meta data for bulk requests
type MetaBuilder func(interface{}) interface{}

pos := r.buf.Len()
if err = r.enc.Encode(meta); err != nil {
return err
}
if err = r.enc.Encode(obj); err != nil {
r.buf.Truncate(pos) // remove meta object from buffer
}
return err
type bulkRequest struct {
requ *http.Request
}

func (r *bulkRequest) Flush() (int, BulkResult, error) {
if r.buf.Len() == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return 0, BulkResult{}, nil
}
type bulkBody interface {
Reader() io.Reader
}

status, resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf)
if err != nil {
return status, BulkResult{}, err
}
r.buf.Truncate(0)
type bulkResult struct {
raw []byte
}

result, err := readBulkResult(resp)
return status, result, err
type jsonBulkBody struct {
buf *bytes.Buffer
}

// Bulk performs many index/delete operations in a single API call.
Expand All @@ -81,86 +57,138 @@ func (conn *Connection) BulkWith(
body []interface{},
) (*QueryResult, error) {
if len(body) == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return nil, nil
}

path, err := makePath(index, docType, "_bulk")
if err != nil {
bulkBody := newJSONBulkBody(nil)
if err := bulkEncode(bulkBody, metaBuilder, body); err != nil {
return nil, err
}

buf := bulkEncode(metaBuilder, body)
if buf.Len() == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return nil, nil
requ, err := newBulkRequest(conn.URL, index, docType, params, bulkBody)
if err != nil {
return nil, err
}

_, resp, err := conn.sendBulkRequest("POST", path, params, &buf)
_, result, err := conn.sendBulkRequest(requ)
if err != nil {
return nil, err
}
return readQueryResult(resp)
return readQueryResult(result.raw)
}

func (conn *Connection) startBulkRequest(
index string,
docType string,
func newBulkRequest(
urlStr string,
index, docType string,
params map[string]string,
body bulkBody,
) (*bulkRequest, error) {
path, err := makePath(index, docType, "_bulk")
if err != nil {
return nil, err
}

r := &bulkRequest{
es: conn,
path: path,
params: params,
url := makeURL(urlStr, path, params)

var reader io.Reader
if body != nil {
reader = body.Reader()
}

requ, err := http.NewRequest("POST", url, reader)
if err != nil {
return nil, err
}

return &bulkRequest{
requ: requ,
}, nil
}

func (r *bulkRequest) Reset(body bulkBody) {
bdy := body.Reader()

rc, ok := bdy.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(bdy)
}

switch v := bdy.(type) {
case *bytes.Buffer:
r.requ.ContentLength = int64(v.Len())
case *bytes.Reader:
r.requ.ContentLength = int64(v.Len())
case *strings.Reader:
r.requ.ContentLength = int64(v.Len())
}
r.enc = json.NewEncoder(&r.buf)
return r, nil

r.requ.Header = http.Header{}
r.requ.Body = rc
}

func (conn *Connection) sendBulkRequest(
method, path string,
params map[string]string,
buf *bytes.Buffer,
) (int, []byte, error) {
url := makeURL(conn.URL, path, params)
logp.Debug("elasticsearch", "Sending bulk request to %s", url)
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

return conn.execRequest(method, url, buf)
result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
}

func bulkEncode(metaBuilder MetaBuilder, body []interface{}) bytes.Buffer {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
func newJSONBulkBody(buf *bytes.Buffer) *jsonBulkBody {
if buf == nil {
buf = bytes.NewBuffer(nil)
}
return &jsonBulkBody{buf}
}

func (b *jsonBulkBody) Reset() {
b.buf.Reset()
}

func (b *jsonBulkBody) Reader() io.Reader {
return b.buf
}

func (b *jsonBulkBody) AddRaw(raw interface{}) error {
enc := json.NewEncoder(b.buf)
return enc.Encode(raw)
}

func (b *jsonBulkBody) Add(meta, obj interface{}) error {
enc := json.NewEncoder(b.buf)
pos := b.buf.Len()

if err := enc.Encode(meta); err != nil {
b.buf.Truncate(pos)
return err
}
if err := enc.Encode(obj); err != nil {
b.buf.Truncate(pos)
return err
}
return nil
}
func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) error {
if metaBuilder == nil {
for _, obj := range body {
pos := buf.Len()
if err := enc.Encode(obj); err != nil {
if err := out.AddRaw(obj); err != nil {
debug("Failed to encode message: %s", err)
buf.Truncate(pos)
return err
}
}
} else {
for _, obj := range body {
pos := buf.Len()
meta := metaBuilder(obj)
err := enc.Encode(meta)
if err == nil {
err = enc.Encode(obj)
}
if err != nil {
if err := out.Add(meta, obj); err != nil {
debug("Failed to encode message: %s", err)
buf.Truncate(pos)
}
}
}
return buf
}

func readBulkResult(obj []byte) (BulkResult, error) {
return BulkResult{obj}, nil
return nil
}
Loading

0 comments on commit 09328f4

Please sign in to comment.