Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] Added support for more mime types, offset tracking via cursor, automatic splitting at root level #34155

Merged
merged 15 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Filebeat*
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127]
- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ even though it can get expensive with dealing with a very large number of files.
describing said error.

[id="supported-types-gcs"]
NOTE: Currently only `JSON` is supported with respect to object/file formats. As for authentication types, we currently have support for
`json credential keys` and `credential files`. If a download for a file/object fails or gets interrupted, the download is retried for 2 times.
This is currently not user configurable.
NOTE: Currently only `JSON` and `NDJSON` are supported object/file formats. Objects/files may be also be gzip compressed.
"JSON credential keys" and "credential files" are supported authentication types.
If an array is present as the root object for an object/file, it is automatically split into individual objects and processed.
If a download for a file/object fails or gets interrupted, the download is retried for 2 times. This is currently not user configurable.


[id="basic-config-gcs"]
Expand Down Expand Up @@ -90,6 +91,7 @@ calls to the bucket list api if it exceeds the given value. Each iteration consi
"_id": "gcs-test-new-data_3.json-worker-1"
},
"log": {
"offset": 200,
"file": {
"path": "gs://gcs-test-new/data_3.json"
}
Expand Down
52 changes: 17 additions & 35 deletions x-pack/filebeat/input/gcs/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,58 @@ var (
errUnsupportedType = errors.New("only JSON objects are accepted")
)

// httpReadJSON accepts json file data in the form of an io.Reader, decodes the json data and returns the decoded
// data in the form of an object represended by a map[string]interface{}. Along with the objectified json data,
// this function also returns the raw json message in binary format. The status value in the return parameter
// is the http status value and have the following values : -
// 1) - StatusNotAcceptable (406) if body is missing
// 2) - StatusBadRequest (400) if the decoding fails
// 3) - StatusOK (200) if the decoding is successful
func httpReadJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
// decodeJSON accepts json file data in the form of an io.Reader, decodes the json data and returns the decoded
// data in the form of an object represended by a map[string]interface{}.
func decodeJSON(body io.Reader) ([]mapstr.M, error) {
if body == http.NoBody {
return nil, nil, http.StatusNotAcceptable, errBodyEmpty
return nil, errBodyEmpty
}
obj, rawMessage, err := decodeJSON(body)
if err != nil {
return nil, nil, http.StatusBadRequest, err
}
return obj, rawMessage, http.StatusOK, err
}

func decodeJSON(body io.Reader) ([]mapstr.M, []json.RawMessage, error) {
var objs []mapstr.M
var rawMessages []json.RawMessage
decoder := json.NewDecoder(body)
for decoder.More() {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}
switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
rawMessages = append(rawMessages, raw)
case []interface{}:
nobjs, nrawMessages, err := decodeJSONArray(bytes.NewReader(raw), decoder.InputOffset())
nobjs, err := decodeJSONArray(bytes.NewReader(raw), decoder.InputOffset())
if err != nil {
return nil, nil, fmt.Errorf("recursive error %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("recursive error %d: %w", decoder.InputOffset(), err)
}
objs = append(objs, nobjs...)
rawMessages = append(rawMessages, nrawMessages...)
default:
return nil, nil, errUnsupportedType
return nil, errUnsupportedType
}
}
for i := range objs {
jsontransform.TransformNumbers(objs[i])
}
return objs, rawMessages, nil
return objs, nil
}

func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, []json.RawMessage, error) {
func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, error) {
var objs []mapstr.M
var rawMessages []json.RawMessage
dec := newJSONDecoder(raw)
token, err := dec.Token()
if err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
return nil, nil, nil
return nil, nil
}
return nil, nil, fmt.Errorf("failed reading JSON array: %w", err)
return nil, fmt.Errorf("failed reading JSON array: %w", err)
}
if token != json.Delim('[') {
return nil, nil, fmt.Errorf("malformed JSON array, not starting with delimiter [ at position: %d", parentOffset+dec.InputOffset())
return nil, fmt.Errorf("malformed JSON array, not starting with delimiter [ at position: %d", parentOffset+dec.InputOffset())
}

for dec.More() {
Expand All @@ -98,21 +81,20 @@ func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, []json.
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
}

m, ok := obj.(map[string]interface{})
if ok {
rawMessages = append(rawMessages, raw)
objs = append(objs, m)
}
}
return objs, rawMessages, nil
return objs, nil
}

func newJSONDecoder(r io.Reader) *json.Decoder {
Expand Down
96 changes: 29 additions & 67 deletions x-pack/filebeat/input/gcs/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package gcs

import (
"encoding/json"
"net/http"
"strings"
"testing"

Expand All @@ -17,123 +15,87 @@ import (

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
body string
wantObjs []mapstr.M
wantStatus int
wantErr bool
wantRawMessage []json.RawMessage
name string
body string
wantObjs []mapstr.M
wantErr bool
}{
{
name: "single object",
body: `{"a": 42, "b": "c"}`,
wantObjs: []mapstr.M{{"a": int64(42), "b": "c"}},
wantStatus: http.StatusOK,
name: "single object",
body: `{"a": 42, "b": "c"}`,
wantObjs: []mapstr.M{{"a": int64(42), "b": "c"}},
},
{
name: "array accepted",
body: `[{"a":"b"},{"c":"d"}]`,
wantObjs: []mapstr.M{{"a": "b"}, {"c": "d"}},
wantStatus: http.StatusOK,
name: "array accepted",
body: `[{"a":"b"},{"c":"d"}]`,
wantObjs: []mapstr.M{{"a": "b"}, {"c": "d"}},
},
{
name: "not an object not accepted",
body: `42`,
wantStatus: http.StatusBadRequest,
wantErr: true,
name: "not an object not accepted",
body: `42`,
wantErr: true,
},
{
name: "not an object mixed",
body: `[{a:1},
42,
{a:2}]`,
wantStatus: http.StatusBadRequest,
wantErr: true,
wantErr: true,
},
{
name: "sequence of objects accepted (CRLF)",
body: `{"a":1}` + "\r" + `{"a":2}`,
wantObjs: []mapstr.M{{"a": int64(1)}, {"a": int64(2)}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (CRLF)",
body: `{"a":1}` + "\r" + `{"a":2}`,
wantObjs: []mapstr.M{{"a": int64(1)}, {"a": int64(2)}},
},
{
name: "sequence of objects accepted (LF)",
body: `{"a":"1"}
{"a":"2"}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":"1"}`),
[]byte(`{"a":"2"}`),
},
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}},
wantStatus: http.StatusOK,
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}},
},
{
name: "sequence of objects accepted (SP)",
body: `{"a":"2"} {"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (SP)",
body: `{"a":"2"} {"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
},
{
name: "sequence of objects accepted (no separator)",
body: `{"a":"2"}{"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (no separator)",
body: `{"a":"2"}{"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
},
{
name: "not an object in sequence",
body: `{"a":"2"}
42
{"a":"2"}`,
wantStatus: http.StatusBadRequest,
wantErr: true,
wantErr: true,
},
{
name: "array of objects in stream",
body: `{"a":"1"} [{"a":"2"},{"a":"3"}] {"a":"4"}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":"1"}`),
[]byte(`{"a":"2"}`),
[]byte(`{"a":"3"}`),
[]byte(`{"a":"4"}`),
},
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}, {"a": "3"}, {"a": "4"}},
wantStatus: http.StatusOK,
name: "array of objects in stream",
body: `{"a":"1"} [{"a":"2"},{"a":"3"}] {"a":"4"}`,
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}, {"a": "3"}, {"a": "4"}},
},
{
name: "numbers",
body: `{"a":1} [{"a":false},{"a":3.14}] {"a":-4}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":1}`),
[]byte(`{"a":false}`),
[]byte(`{"a":3.14}`),
[]byte(`{"a":-4}`),
},
wantObjs: []mapstr.M{
{"a": int64(1)},
{"a": false},
{"a": 3.14},
{"a": int64(-4)},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body))
gotObjs, err := decodeJSON(strings.NewReader(tt.body))
if (err != nil) != tt.wantErr {
t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !assert.EqualValues(t, tt.wantObjs, gotObjs) {
t.Errorf("httpReadJSON() gotObjs = %v, want %v", gotObjs, tt.wantObjs)
}
if gotStatus != tt.wantStatus {
t.Errorf("httpReadJSON() gotStatus = %v, want %v", gotStatus, tt.wantStatus)
}
if tt.wantRawMessage != nil {
assert.Equal(t, tt.wantRawMessage, rawMessages)
}
assert.Equal(t, len(gotObjs), len(rawMessages))
})
}
}
Loading