Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat: add TLS + pipeline options to checkpoint module #1

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit
- Add Kibana Dashboard for MISP module. {pull}14147[14147]
- Add support for gzipped files in S3 input {pull}13980[13980]
- Add Filebeat Azure Dashboards {pull}14127[14127]
- Add support for thread ID in Filebeat Kafka module. {pull}19463[19463]


*Heartbeat*
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]
- Adds `date_cursor` option to httpjson input. {pull}19483[19483]

*Heartbeat*

Expand Down Expand Up @@ -310,6 +311,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]
- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808]
- Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607]
- Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905]
- Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817]
- Add support for multiple sets of hints on autodiscover {pull}18883[18883]

Expand Down
3,369 changes: 652 additions & 2,717 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ your dev environment to build Beats from the source.

## Snapshots

For testing purposes, we generate snapshot builds that you can find [here](https://beats-ci.elastic.co/job/elastic+beats+master+multijob-package-linux/lastSuccessfulBuild/gcsObjects/). Please be aware that these are built on top of master and are not meant for production.
For testing purposes, we generate snapshot builds that you can find [here](https://beats-ci.elastic.co/job/Beats/job/packaging/job/master/lastSuccessfulBuild/gcsObjects/). Please be aware that these are built on top of master and are not meant for production.

## CI

Expand Down
5 changes: 3 additions & 2 deletions deploy/kubernetes/auditbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ spec:
path: /etc
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: auditbeat-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: auditbeat-daemonset-modules
- name: data
hostPath:
# When auditbeat runs as non-root user, this directory needs to be writable by group (g+w).
path: /var/lib/auditbeat-data
type: DirectoryOrCreate
- name: run-containerd
Expand Down
5 changes: 3 additions & 2 deletions deploy/kubernetes/auditbeat/auditbeat-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ spec:
path: /etc
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: auditbeat-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: auditbeat-daemonset-modules
- name: data
hostPath:
# When auditbeat runs as non-root user, this directory needs to be writable by group (g+w).
path: /var/lib/auditbeat-data
type: DirectoryOrCreate
- name: run-containerd
Expand Down
3 changes: 2 additions & 1 deletion deploy/kubernetes/filebeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ spec:
volumes:
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: filebeat-config
- name: varlibdockercontainers
hostPath:
Expand All @@ -123,6 +123,7 @@ spec:
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
# When filebeat runs as non-root user, this directory needs to be writable by group (g+w).
path: /var/lib/filebeat-data
type: DirectoryOrCreate
---
Expand Down
3 changes: 2 additions & 1 deletion deploy/kubernetes/filebeat/filebeat-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ spec:
volumes:
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: filebeat-config
- name: varlibdockercontainers
hostPath:
Expand All @@ -79,5 +79,6 @@ spec:
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
# When filebeat runs as non-root user, this directory needs to be writable by group (g+w).
path: /var/lib/filebeat-data
type: DirectoryOrCreate
9 changes: 5 additions & 4 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,15 @@ spec:
path: /var/run/docker.sock
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-daemonset-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-daemonset-modules
- name: data
hostPath:
# When metricbeat runs as non-root user, this directory needs to be writable by group (g+w)
path: /var/lib/metricbeat-data
type: DirectoryOrCreate
---
Expand Down Expand Up @@ -302,11 +303,11 @@ spec:
volumes:
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-deployment-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-deployment-modules
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
5 changes: 3 additions & 2 deletions deploy/kubernetes/metricbeat/metricbeat-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ spec:
path: /var/run/docker.sock
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-daemonset-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-daemonset-modules
- name: data
hostPath:
# When metricbeat runs as non-root user, this directory needs to be writable by group (g+w)
path: /var/lib/metricbeat-data
type: DirectoryOrCreate
4 changes: 2 additions & 2 deletions deploy/kubernetes/metricbeat/metricbeat-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ spec:
volumes:
- name: config
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-deployment-config
- name: modules
configMap:
defaultMode: 0600
defaultMode: 0640
name: metricbeat-deployment-modules
4 changes: 2 additions & 2 deletions dev-tools/mage/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func GoImports() error {
return err
}
} else {
if err := gotool.Get(
gotool.Get.Package(filepath.Join(GoImportsImportPath)),
if err := gotool.Install(
gotool.Install.Package(filepath.Join(GoImportsImportPath)),
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions dev-tools/notice/overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
{"name": "github.com/chzyer/logex", "licenceType": "MIT"}
{"name": "github.com/munnerz/goautoneg", "licenceType": "BSD-3-Clause"}
{"name": "github.com/pelletier/go-buffruneio", "licenceType": "MIT"}
{"name": "github.com/urso/magetools", "licenceType": "Apache-2.0"}
4 changes: 2 additions & 2 deletions dev-tools/packaging/templates/docker/Dockerfile.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RUN chmod 755 /usr/local/bin/docker-entrypoint
RUN groupadd --gid 1000 {{ .BeatName }}

RUN mkdir {{ $beatHome }}/data {{ $beatHome }}/logs && \
chown -R root:{{ .BeatName }} {{ $beatHome }} && \
chown -R root:root {{ $beatHome }} && \
find {{ $beatHome }} -type d -exec chmod 0750 {} \; && \
find {{ $beatHome }} -type f -exec chmod 0640 {} \; && \
chmod 0750 {{ $beatBinary }} && \
Expand All @@ -43,7 +43,7 @@ RUN mkdir {{ $beatHome }}/data {{ $beatHome }}/logs && \
chmod 0770 {{ $beatHome }}/data {{ $beatHome }}/logs

{{- if ne .user "root" }}
RUN useradd -M --uid 1000 --gid 1000 --home {{ $beatHome }} {{ .user }}
RUN useradd -M --uid 1000 --gid 1000 --groups 0 --home {{ $beatHome }} {{ .user }}
{{- end }}
USER {{ .user }}

Expand Down
10 changes: 10 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24755,6 +24755,16 @@ type: keyword
Java class the log is coming from.


type: keyword

--

*`kafka.log.thread`*::
+
--
Thread name the log is coming from.


type: keyword

--
Expand Down
2 changes: 2 additions & 0 deletions filebeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This file is generated! See scripts/docs_collector.py
The +{modulename}+ module collects and parses the logs created by
https://kafka.apache.org/[Kafka].

The module has additional support for parsing thread ID from logs.

include::../include/what-happens.asciidoc[]

include::../include/gs-link.asciidoc[]
Expand Down
124 changes: 124 additions & 0 deletions filebeat/input/v2/input-cursor/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCursor_IsNew(t *testing.T) {
t.Run("true if key is not in store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
require.True(t, cursor.IsNew())
})

t.Run("true if key is in store but without cursor value", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: nil},
}))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
require.True(t, cursor.IsNew())
})

t.Run("false if key with cursor value is in persistent store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
require.False(t, cursor.IsNew())
})

t.Run("false if key with cursor value is in memory store only", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: nil},
}))
defer store.Release()

res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
require.NoError(t, err)
defer op.done(1)

cursor := makeCursor(store, res)
require.False(t, cursor.IsNew())
})
}

func TestCursor_Unpack(t *testing.T) {
t.Run("nothing to unpack if key is new", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()

var st string
cursor := makeCursor(store, store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "", st)
})

t.Run("unpack fails if types are not compatible", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()

var st struct{ A uint }
cursor := makeCursor(store, store.Get("test::key"))
require.Error(t, cursor.Unpack(&st))
})

t.Run("unpack from state in persistent store", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()

var st string
cursor := makeCursor(store, store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "test", st)
})

t.Run("unpack from in memory state if updates are pending", func(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, map[string]state{
"test::key": {Cursor: "test"},
}))
defer store.Release()

res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
require.NoError(t, err)
defer op.done(1)

var st string
cursor := makeCursor(store, store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "test-state-update", st)
})
}
36 changes: 36 additions & 0 deletions filebeat/input/v2/input-cursor/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
)

// Publisher is used to publish an event and update the cursor in a single call to Publish.
Expand Down Expand Up @@ -70,3 +71,38 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
func (op *updateOp) Execute(numEvents uint) {
panic("TODO: implement me")
}

func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
ts := time.Now()

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

cursor := resource.pendingCursor
if resource.activeCursorOperations == 0 {
var tmp interface{}
typeconv.Convert(&tmp, cursor)
resource.pendingCursor = tmp
cursor = tmp
}
if err := typeconv.Convert(&cursor, updates); err != nil {
return nil, err
}
resource.pendingCursor = cursor

resource.Retain()
resource.activeCursorOperations++
return &updateOp{
resource: resource,
store: store,
timestamp: ts,
delta: updates,
}, nil
}

// done releases resources held by the last N updateOps.
func (op *updateOp) done(n uint) {
op.resource.UpdatesReleaseN(n)
op.resource = nil
*op = updateOp{}
}
Loading