From a9aff6f1d8c0eff77b867a358f0fae0a641cbfb9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 13:04:52 -0800 Subject: [PATCH] Log monitoring bulk failures (#14356) * Log monitoring bulk failures * Renaming function * Simplifying type * Removing extraneous second value * Adding godoc comments * Adding CHANGELOG entry * Clarifying log messages * WIP: adding unit test stubs * Fleshing out unit tests --- CHANGELOG.next.asciidoc | 1 + .../monitoring/report/elasticsearch/client.go | 31 ++++++- libbeat/outputs/elasticsearch/bulkapi.go | 29 +++---- .../elasticsearch/bulkapi_mock_test.go | 8 +- libbeat/outputs/elasticsearch/client.go | 82 ++++++++++--------- libbeat/outputs/elasticsearch/client_test.go | 74 ++++++++++++++--- libbeat/outputs/elasticsearch/json_read.go | 77 ++++++++--------- 7 files changed, 192 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6ad37cf027e3..603232e85b1c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -251,6 +251,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update azure configuration example. {issue}14224[14224] - Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391] - Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378] +- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356] *Packetbeat* diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 37e00d4009b9..6f47c6b62095 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -20,6 +20,7 @@ package elasticsearch import ( "encoding/json" "fmt" + "net/http" "time" "github.com/pkg/errors" @@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. // FIXME: index name (first param below) - _, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:]) + result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:]) + if err != nil { + return err + } + + logBulkFailures(result, []report.Event{document}) return err } @@ -238,3 +244,26 @@ func getMonitoringIndexName() string { date := time.Now().Format("2006.01.02") return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } + +func logBulkFailures(result esout.BulkResult, events []report.Event) { + reader := esout.NewJSONReader(result) + err := esout.BulkReadToItems(reader) + if err != nil { + logp.Err("failed to parse monitoring bulk items: %v", err) + return + } + + for i := range events { + status, msg, err := esout.BulkReadItemStatus(reader) + if err != nil { + logp.Err("failed to parse monitoring bulk item status: %v", err) + return + } + switch { + case status < 300, status == http.StatusConflict: + continue + default: + logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + } + } +} diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index f013c2a113ac..48c174624306 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -19,6 +19,7 @@ package elasticsearch import ( "bytes" + "encoding/json" "io" "io/ioutil" "net/http" @@ -34,16 +35,15 @@ type bulkRequest struct { requ *http.Request } -type bulkResult struct { - raw []byte -} +// BulkResult contains the result of a bulk API request. +type BulkResult json.RawMessage // Bulk performs many index/delete operations in a single API call. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html func (conn *Connection) Bulk( index, docType string, params map[string]string, body []interface{}, -) (*QueryResult, error) { +) (BulkResult, error) { return conn.BulkWith(index, docType, params, nil, body) } @@ -56,7 +56,7 @@ func (conn *Connection) BulkWith( params map[string]string, metaBuilder MetaBuilder, body []interface{}, -) (*QueryResult, error) { +) (BulkResult, error) { if len(body) == 0 { return nil, nil } @@ -76,7 +76,7 @@ func (conn *Connection) BulkWith( if err != nil { return nil, err } - return readQueryResult(result.raw) + return result, nil } // SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of @@ -85,7 +85,7 @@ func (conn *Connection) BulkWith( func (conn *Connection) SendMonitoringBulk( params map[string]string, body []interface{}, -) (*QueryResult, error) { +) (BulkResult, error) { if len(body) == 0 { return nil, nil } @@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk( if err != nil { return nil, err } - return readQueryResult(result.raw) + return result, nil } func newBulkRequest( @@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) { body.AddHeader(&r.requ.Header) } -func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) { +func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) { status, resp, err := conn.execHTTPRequest(requ.requ) - if err != nil { - return status, bulkResult{}, err - } - - result, err := readBulkResult(resp) - return status, result, err -} - -func readBulkResult(obj []byte) (bulkResult, error) { - return bulkResult{obj}, nil + return status, BulkResult(resp), err } func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { diff --git a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go index 2d0efa2fc5af..ca927db61c41 100644 --- a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go +++ b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go @@ -20,7 +20,6 @@ package elasticsearch import ( - "encoding/json" "fmt" "net/http" "os" @@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { logp.TestingSetup(logp.WithSelectors("elasticsearch")) index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) - expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true}) + expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`) ops := []map[string]interface{}{ { @@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { params := map[string]string{ "refresh": "true", } - resp, err := client.Bulk(index, "type1", params, body) + _, err := client.Bulk(index, "type1", params, body) if err != nil { t.Errorf("Bulk() returns error: %s", err) } - if !resp.Created { - t.Errorf("Bulk() fails: %s", resp) - } } func TestOneHost500Resp_Bulk(t *testing.T) { diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 847acd0bebdf..a010acdecb41 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -53,7 +53,7 @@ type Client struct { bulkRequ *bulkRequest // buffered json response reader - json jsonReader + json JSONReader // additional configs compressionLevel int @@ -128,6 +128,7 @@ var ( ) var ( + errExpectedItemsArray = errors.New("expected items array") errExpectedItemObject = errors.New("expected item response object") errExpectedStatusCode = errors.New("expected item status code") errUnexpectedEmptyObject = errors.New("empty object") @@ -360,7 +361,7 @@ func (client *Client) publishEvents( failedEvents = data stats.fails = len(failedEvents) } else { - client.json.init(result.raw) + client.json.init(result) failedEvents, stats = bulkCollectPublishFails(&client.json, data) } @@ -478,38 +479,11 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. func bulkCollectPublishFails( - reader *jsonReader, + reader *JSONReader, data []publisher.Event, ) ([]publisher.Event, bulkResultStats) { - if err := reader.expectDict(); err != nil { - logp.Err("Failed to parse bulk response: expected JSON object") - return nil, bulkResultStats{} - } - - // find 'items' field in response - for { - kind, name, err := reader.nextFieldName() - if err != nil { - logp.Err("Failed to parse bulk response") - return nil, bulkResultStats{} - } - - if kind == dictEnd { - logp.Err("Failed to parse bulk response: no 'items' field in response") - return nil, bulkResultStats{} - } - - // found items array -> continue - if bytes.Equal(name, nameItems) { - break - } - - reader.ignoreNext() - } - - // check items field is an array - if err := reader.expectArray(); err != nil { - logp.Err("Failed to parse bulk response: expected items array") + if err := BulkReadToItems(reader); err != nil { + logp.Err("failed to parse bulk response: %v", err.Error()) return nil, bulkResultStats{} } @@ -517,7 +491,7 @@ func bulkCollectPublishFails( failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := itemStatus(reader) + status, msg, err := BulkReadItemStatus(reader) if err != nil { return nil, bulkResultStats{} } @@ -553,9 +527,43 @@ func bulkCollectPublishFails( return failed, stats } -func itemStatus(reader *jsonReader) (int, []byte, error) { +// BulkReadToItems reads the bulk response up to (but not including) items +func BulkReadToItems(reader *JSONReader) error { + if err := reader.ExpectDict(); err != nil { + return errExpectedObject + } + + // find 'items' field in response + for { + kind, name, err := reader.nextFieldName() + if err != nil { + return err + } + + if kind == dictEnd { + return errExpectedItemsArray + } + + // found items array -> continue + if bytes.Equal(name, nameItems) { + break + } + + reader.ignoreNext() + } + + // check items field is an array + if err := reader.ExpectArray(); err != nil { + return errExpectedItemsArray + } + + return nil +} + +// BulkReadItemStatus reads the status and error fields from the bulk item +func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) { // skip outer dictionary - if err := reader.expectDict(); err != nil { + if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject } @@ -593,8 +601,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) { return status, msg, nil } -func itemStatusInner(reader *jsonReader) (int, []byte, error) { - if err := reader.expectDict(); err != nil { +func itemStatusInner(reader *JSONReader) (int, []byte, error) { + if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 0dce9928b15d..72ad6ee22f9c 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -41,8 +41,8 @@ import ( ) func readStatusItem(in []byte) (int, string, error) { - reader := newJSONReader(in) - code, msg, err := itemStatus(reader) + reader := NewJSONReader(in) + code, msg, err := BulkReadItemStatus(reader) return code, string(msg), err } @@ -102,7 +102,7 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - reader := newJSONReader(response) + reader := NewJSONReader(response) res, _ := bulkCollectPublishFails(reader, events) assert.Equal(t, 0, len(res)) } @@ -120,7 +120,7 @@ func TestCollectPublishFailMiddle(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, eventFail, event} - reader := newJSONReader(response) + reader := NewJSONReader(response) res, stats := bulkCollectPublishFails(reader, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { @@ -141,7 +141,7 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, event, event} - reader := newJSONReader(response) + reader := NewJSONReader(response) res, stats := bulkCollectPublishFails(reader, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) @@ -183,7 +183,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event} - reader := newJSONReader(response) + reader := NewJSONReader(response) res, _ := bulkCollectPublishFails(reader, events) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) @@ -201,7 +201,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 1}}} events := []publisher.Event{event, event, event} - reader := newJSONReader(nil) + reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) res, _ := bulkCollectPublishFails(reader, events) @@ -224,7 +224,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, eventFail, event} - reader := newJSONReader(nil) + reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) res, _ := bulkCollectPublishFails(reader, events) @@ -246,7 +246,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, event, event} - reader := newJSONReader(nil) + reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) res, _ := bulkCollectPublishFails(reader, events) @@ -265,7 +265,9 @@ func TestClientWithHeaders(t *testing.T) { // For incoming requests, the Host header is promoted to the // Request.Host field and removed from the Header map. assert.Equal(t, "myhost.local", r.Host) - fmt.Fprintln(w, "Hello, client") + + bulkResponse := `{"items":[{"index":{}},{"index":{}},{"index":{}}]}` + fmt.Fprintln(w, bulkResponse) requestCount++ })) defer ts.Close() @@ -446,3 +448,55 @@ func TestClientWithAPIKey(t *testing.T) { client.Ping() assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } + +func TestBulkReadToItems(t *testing.T) { + response := []byte(`{ + "errors": false, + "items": [ + {"create": {"status": 200}}, + {"create": {"status": 300}}, + {"create": {"status": 400}} + ]}`) + + reader := NewJSONReader(response) + + err := BulkReadToItems(reader) + assert.NoError(t, err) + + for status := 200; status <= 400; status += 100 { + err = reader.ExpectDict() + assert.NoError(t, err) + + kind, raw, err := reader.nextFieldName() + assert.NoError(t, err) + assert.Equal(t, mapKeyEntity, kind) + assert.Equal(t, []byte("create"), raw) + + err = reader.ExpectDict() + assert.NoError(t, err) + + kind, raw, err = reader.nextFieldName() + assert.NoError(t, err) + assert.Equal(t, mapKeyEntity, kind) + assert.Equal(t, []byte("status"), raw) + + code, err := reader.nextInt() + assert.NoError(t, err) + assert.Equal(t, status, code) + + _, _, err = reader.endDict() + assert.NoError(t, err) + + _, _, err = reader.endDict() + assert.NoError(t, err) + } +} + +func TestBulkReadItemStatus(t *testing.T) { + response := []byte(`{"create": {"status": 200}}`) + + reader := NewJSONReader(response) + code, _, err := BulkReadItemStatus(reader) + assert.NoError(t, err) + assert.Equal(t, 200, code) +} diff --git a/libbeat/outputs/elasticsearch/json_read.go b/libbeat/outputs/elasticsearch/json_read.go index 5b205f4c012f..896ec89f2fae 100644 --- a/libbeat/outputs/elasticsearch/json_read.go +++ b/libbeat/outputs/elasticsearch/json_read.go @@ -25,12 +25,12 @@ import ( // SAX like json parser. But instead of relying on callbacks, state machine // returns raw item plus entity. On top of state machine additional helper methods -// like expectDict, expectArray, nextFieldName and nextInt are available for +// like ExpectDict, ExpectArray, nextFieldName and nextInt are available for // low-level parsing/stepping through a json document. // // Due to parser simply stepping through the input buffer, almost no additional // allocations are required. -type jsonReader struct { +type JSONReader struct { streambuf.Buffer // parser state machine @@ -133,13 +133,14 @@ func (s state) String() string { return "unknown" } -func newJSONReader(in []byte) *jsonReader { - r := &jsonReader{} +// NewJSONReader returns a new JSONReader initialized with in +func NewJSONReader(in []byte) *JSONReader { + r := &JSONReader{} r.init(in) return r } -func (r *jsonReader) init(in []byte) { +func (r *JSONReader) init(in []byte) { r.Buffer.Init(in, true) r.currentState = startState r.states = r.statesBuf[:0] @@ -147,18 +148,18 @@ func (r *jsonReader) init(in []byte) { var whitespace = []byte(" \t\r\n") -func (r *jsonReader) skipWS() { +func (r *JSONReader) skipWS() { r.IgnoreSymbols(whitespace) } -func (r *jsonReader) pushState(next state) { +func (r *JSONReader) pushState(next state) { if r.currentState != failedState { r.states = append(r.states, r.currentState) } r.currentState = next } -func (r *jsonReader) popState() { +func (r *JSONReader) popState() { if len(r.states) == 0 { r.currentState = failedState } else { @@ -168,7 +169,8 @@ func (r *jsonReader) popState() { } } -func (r *jsonReader) expectDict() error { +// ExpectDict checks if the next entity is a json object +func (r *JSONReader) ExpectDict() error { e, _, err := r.step() if err != nil { @@ -182,7 +184,8 @@ func (r *jsonReader) expectDict() error { return nil } -func (r *jsonReader) expectArray() error { +// ExpectArray checks if the next entity is a json array +func (r *JSONReader) ExpectArray() error { e, _, err := r.step() if err != nil { return err @@ -195,7 +198,7 @@ func (r *jsonReader) expectArray() error { return nil } -func (r *jsonReader) nextFieldName() (entity, []byte, error) { +func (r *JSONReader) nextFieldName() (entity, []byte, error) { e, raw, err := r.step() if err != nil { return e, raw, err @@ -208,7 +211,7 @@ func (r *jsonReader) nextFieldName() (entity, []byte, error) { return e, raw, err } -func (r *jsonReader) nextInt() (int, error) { +func (r *JSONReader) nextInt() (int, error) { e, raw, err := r.step() if err != nil { return 0, err @@ -224,7 +227,7 @@ func (r *jsonReader) nextInt() (int, error) { } // ignore type of next element and return raw content. -func (r *jsonReader) ignoreNext() (raw []byte, err error) { +func (r *JSONReader) ignoreNext() (raw []byte, err error) { r.skipWS() snapshot := r.Snapshot() @@ -253,7 +256,7 @@ func (r *jsonReader) ignoreNext() (raw []byte, err error) { return bytes, nil } -func ignoreKind(r *jsonReader, kind entity) error { +func ignoreKind(r *JSONReader, kind entity) error { for { e, _, err := r.step() if err != nil { @@ -276,7 +279,7 @@ func ignoreKind(r *jsonReader, kind entity) error { } // step continues the JSON parser state machine until next entity has been parsed. -func (r *jsonReader) step() (entity, []byte, error) { +func (r *JSONReader) step() (entity, []byte, error) { r.skipWS() switch r.currentState { case failedState: @@ -298,11 +301,11 @@ func (r *jsonReader) step() (entity, []byte, error) { } } -func (r *jsonReader) stepFailing() (entity, []byte, error) { +func (r *JSONReader) stepFailing() (entity, []byte, error) { return failEntity, nil, r.Err() } -func (r *jsonReader) stepStart() (entity, []byte, error) { +func (r *JSONReader) stepStart() (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(err) @@ -311,11 +314,11 @@ func (r *jsonReader) stepStart() (entity, []byte, error) { return r.tryStepPrimitive(c) } -func (r *jsonReader) stepArray() (entity, []byte, error) { +func (r *JSONReader) stepArray() (entity, []byte, error) { return r.doStepArray(true) } -func (r *jsonReader) stepArrayNext() (entity, []byte, error) { +func (r *JSONReader) stepArrayNext() (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(errFailing) @@ -334,7 +337,7 @@ func (r *jsonReader) stepArrayNext() (entity, []byte, error) { } } -func (r *jsonReader) doStepArray(allowArrayEnd bool) (entity, []byte, error) { +func (r *JSONReader) doStepArray(allowArrayEnd bool) (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(err) @@ -351,11 +354,11 @@ func (r *jsonReader) doStepArray(allowArrayEnd bool) (entity, []byte, error) { return r.tryStepPrimitive(c) } -func (r *jsonReader) stepDict() (entity, []byte, error) { +func (r *JSONReader) stepDict() (entity, []byte, error) { return r.doStepDict(true) } -func (r *jsonReader) doStepDict(allowEnd bool) (entity, []byte, error) { +func (r *JSONReader) doStepDict(allowEnd bool) (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(err) @@ -375,7 +378,7 @@ func (r *jsonReader) doStepDict(allowEnd bool) (entity, []byte, error) { } } -func (r *jsonReader) stepDictValue() (entity, []byte, error) { +func (r *JSONReader) stepDictValue() (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(err) @@ -385,7 +388,7 @@ func (r *jsonReader) stepDictValue() (entity, []byte, error) { return r.tryStepPrimitive(c) } -func (r *jsonReader) stepDictValueEnd() (entity, []byte, error) { +func (r *JSONReader) stepDictValueEnd() (entity, []byte, error) { c, err := r.PeekByte() if err != nil { return r.failWith(err) @@ -404,7 +407,7 @@ func (r *jsonReader) stepDictValueEnd() (entity, []byte, error) { } } -func (r *jsonReader) tryStepPrimitive(c byte) (entity, []byte, error) { +func (r *JSONReader) tryStepPrimitive(c byte) (entity, []byte, error) { switch c { case '{': // start dictionary return r.startDict() @@ -432,19 +435,19 @@ func (r *jsonReader) tryStepPrimitive(c byte) (entity, []byte, error) { } } -func (r *jsonReader) stepNull() (entity, []byte, error) { +func (r *JSONReader) stepNull() (entity, []byte, error) { return stepSymbol(r, nullValue, nullSymbol, errExpectedNull) } -func (r *jsonReader) stepTrue() (entity, []byte, error) { +func (r *JSONReader) stepTrue() (entity, []byte, error) { return stepSymbol(r, trueValue, trueSymbol, errExpectedTrue) } -func (r *jsonReader) stepFalse() (entity, []byte, error) { +func (r *JSONReader) stepFalse() (entity, []byte, error) { return stepSymbol(r, falseValue, falseSymbol, errExpectedFalse) } -func stepSymbol(r *jsonReader, e entity, symb []byte, fail error) (entity, []byte, error) { +func stepSymbol(r *JSONReader, e entity, symb []byte, fail error) (entity, []byte, error) { ok, err := r.MatchASCII(symb) if err != nil { return failEntity, nil, err @@ -457,7 +460,7 @@ func stepSymbol(r *jsonReader, e entity, symb []byte, fail error) (entity, []byt return e, nil, nil } -func (r *jsonReader) stepMapKey() (entity, []byte, error) { +func (r *JSONReader) stepMapKey() (entity, []byte, error) { e, key, err := r.stepString() if err != nil { return e, key, err @@ -479,7 +482,7 @@ func (r *jsonReader) stepMapKey() (entity, []byte, error) { return mapKeyEntity, key, nil } -func (r *jsonReader) stepString() (entity, []byte, error) { +func (r *JSONReader) stepString() (entity, []byte, error) { start := 1 for { idxQuote := r.IndexByteFrom(start, '"') @@ -499,36 +502,36 @@ func (r *jsonReader) stepString() (entity, []byte, error) { } } -func (r *jsonReader) startDict() (entity, []byte, error) { +func (r *JSONReader) startDict() (entity, []byte, error) { r.Advance(1) r.pushState(dictState) return dictStart, nil, nil } -func (r *jsonReader) endDict() (entity, []byte, error) { +func (r *JSONReader) endDict() (entity, []byte, error) { r.Advance(1) r.popState() return dictEnd, nil, nil } -func (r *jsonReader) startArray() (entity, []byte, error) { +func (r *JSONReader) startArray() (entity, []byte, error) { r.Advance(1) r.pushState(arrState) return arrStart, nil, nil } -func (r *jsonReader) endArray() (entity, []byte, error) { +func (r *JSONReader) endArray() (entity, []byte, error) { r.Advance(1) r.popState() return arrEnd, nil, nil } -func (r *jsonReader) failWith(err error) (entity, []byte, error) { +func (r *JSONReader) failWith(err error) (entity, []byte, error) { r.currentState = failedState return failEntity, nil, r.SetError(err) } -func (r *jsonReader) stepNumber() (entity, []byte, error) { +func (r *JSONReader) stepNumber() (entity, []byte, error) { snapshot := r.Snapshot() lenBefore := r.Len() isDouble := false