Skip to content

Commit

Permalink
[Filebeat][CometD] Add CometD input for Salesforce connector (#31492)
Browse files Browse the repository at this point in the history
* Add authentication mechanism for cometd input for Salesforce connector

* update based on comments: clear empty line and add error.Errorf with context

* Add data collection mechanism for Salesforce cometd input

* Update the approach of the channel

* Add asciidoc for cometd input

* Resolve review comments

* Update cometd input to import forked dependency

* Temorarily remove changelog entry

* Run mage check and update

* Update go.mod and go.sum

* Update go.mod and NOTICE.txt

* Make changes with respect to ownership transfer

* Update go.mod and go.sum

* Add two unit test cases and nit

* Add unit test cases and integration tests

* Address review comments

* Resolve some linting issues

* Resolve some linting issues

* Add unit test for input

* Add integration test

* Lint issues

* Lint

* Lint

* update bayeux v1.0.3

* Add NOTICE.txt

* Add negative test cases

* requested changes

* Resolve CI lint errors

* Update go.sum

* Update NOTICE.txt

* Update unit tests

* Update unit test name

* Update tests

* Remove test.out

* Use time.After to eliminate possible flaky test

* Add buffer of 1 in message channel

* Update tests based on bayeux client status watcher changes

* incorporate bayeux changes

* Update unit tests

* resolve unit tests

* additional tests with multiple inputs, added channel_name filed in comman cometd fields

* nit: Event struct -> event struct

* test case improvement

* update looger Error -> Errorw to print formatted errors

* update docs with reference links

* Add CHANGELOG entry and update docs

* Use mapstr.M instead of common.MapStr

* Changes based on Replace common.Config and friends with config.C

* resolve make check error

* resolve comman config errors

* update unit tests

* update multiple input unit test

* Update TestMultiInput unit test

* Update minor nit to re-trigger CI

* Resolve flanky tests

* Resolve linting issues

* Resolve tests -> send empty response {}

* nit: inputType -> expectedHTTPEventCount

* nit: wg -> waitForEventCollection

Co-authored-by: yug-crest <[email protected]>
Co-authored-by: yug-elastic <[email protected]>
Co-authored-by: Craig MacKenzie <[email protected]>
Co-authored-by: yug-elastic <[email protected]>
  • Loading branch information
5 people authored May 6, 2022
1 parent 06a8add commit 29c0d4c
Show file tree
Hide file tree
Showing 17 changed files with 990 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5905,6 +5905,7 @@ https://github.com/elastic/beats/compare/v6.2.3\...v6.3.0[View commits]
- Add support human friendly size for the UDP input. {pull}6886[6886]
- Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Add CometD input {pull}30089[30089]

*Heartbeat*

Expand Down
31 changes: 31 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5804,6 +5804,37 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND



--------------------------------------------------------------------------------
Dependency : github.com/elastic/bayeux
Version: v1.0.5
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

MIT License

Copyright (c) 2017 Zander Hill

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.0-20210727140539-f0905d9377f6
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-aws-s3>>
* <<{beatname_lc}-input-azure-eventhub>>
* <<{beatname_lc}-input-cloudfoundry>>
* <<{beatname_lc}-input-cometd>>
* <<{beatname_lc}-input-container>>
* <<{beatname_lc}-input-filestream>>
* <<{beatname_lc}-input-gcp-pubsub>>
Expand Down Expand Up @@ -94,6 +95,8 @@ include::../../x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-cometd.asciidoc[]

include::inputs/input-container.asciidoc[]

include::inputs/input-filestream.asciidoc[]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ require (
)

require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-libs v0.2.3
github.com/shirou/gopsutil/v3 v3.21.12
go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0=
github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1:nFvXHBjYK3e9+xF0WKDeAKK4aOO51uC28s+L9rBmilo=
Expand Down
8 changes: 8 additions & 0 deletions testing/environments/docker/cometd/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM docker.elastic.co/observability/stream:v0.6.1

RUN apt update && \
apt -y install curl
ENV PORT="8080"
COPY files /files
HEALTHCHECK --interval=1s --retries=600 CMD curl -X POST http://localhost:8080/token
CMD [ "http-server", "--addr=:8080", "--config=/files/config.yml" ]
27 changes: 27 additions & 0 deletions testing/environments/docker/cometd/files/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
rules:
- path: /token
methods: ["POST"]
responses:
- status_code: 200
body: |
{"instance_url": "http://cometd:8080", "expires_in": "60", "access_token": "abcd"}
- path: /cometd/38.0
methods: ["POST"]
request_body: '{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}'
responses:
- status_code: 200
body: |
[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"94b112sp7ph1c9s41mycpzik4rkj3","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]
- path: /cometd/38.0
methods: ["POST"]
request_body: '{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "94b112sp7ph1c9s41mycpzik4rkj3"} '
responses:
- status_code: 200
body: |
[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "/event/LoginEventStream"}]
- path: /cometd/38.0
methods: ["POST"]
responses:
- status_code: 200
body: |
[{"clientId": "94b112sp7ph1c9s41mycpzik4rkj3", "channel": "/meta/subscribe", "subscription": "/event/LoginEventStream", "successful":true}]
6 changes: 6 additions & 0 deletions x-pack/filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
cometd: { condition: service_healthy }

elasticsearch:
extends:
Expand All @@ -33,3 +34,8 @@ services:
retries: 300
interval: 1s

cometd:
build: ${ES_BEATS}/testing/environments/docker/cometd
hostname: cometd
ports:
- 8080:8080
65 changes: 65 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cometd.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
[role="xpack"]

:type: cometd

[id="{beatname_lc}-input-{type}"]
=== CometD input

++++
<titleabbrev>CometD</titleabbrev>
++++

Use the `https://docs.cometd.org/[cometd]` input to stream the real-time events from a https://resources.docs.salesforce.com/sfdc/pdf/api_streaming.pdf[Salesforce generic subscription Push Topic].

This input can, for example, be used to receive Login and Logout events that are generated when users log in or out of the Salesforce instance.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: cometd
channel_name: /event/MyEventStream
auth.oauth2:
client.id: my-client-id
client.secret: my-client-secret
token_url: https://login.salesforce.com/services/oauth2/token
user: [email protected]
password: my-password
----

==== Configuration options

The `cometd` input supports the following configuration options.

[float]
==== `channel_name`

Salesforce generic subscription Push Topic name. Required.

[float]
==== `auth.oauth2.client.id`

The client ID used as part of the authentication flow. Required.

[float]
==== `auth.oauth2.client.secret`

The client secret used as part of the authentication flow. Required.

[float]
==== `auth.oauth2.token_url`

The endpoint that will be used to generate the token during the oauth2 flow. Required.

[float]
==== `auth.oauth2.user`

The user used as part of the authentication flow. It is required for authentication - grant type password. Required.

[float]
==== `auth.oauth2.password`

The password used as part of the authentication flow. It is required for authentication - grant type password. Required.

:type!:
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions x-pack/filebeat/input/cometd/client_mocked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd

import (
"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
)

type mockedConnector struct {
connectWithError error
outlet channel.Outleter
}

var _ channel.Connector = new(mockedConnector)

func (m *mockedConnector) Connect(c *conf.C) (channel.Outleter, error) {
return m.ConnectWith(c, beat.ClientConfig{})
}

func (m *mockedConnector) ConnectWith(*conf.C, beat.ClientConfig) (channel.Outleter, error) {
if m.connectWithError != nil {
return nil, m.connectWithError
}
return m.outlet, nil
}

type mockedOutleter struct {
onEventHandler func(event beat.Event) bool
}

var _ channel.Outleter = new(mockedOutleter)

func (m mockedOutleter) Close() error {
panic("implement me")
}

func (m mockedOutleter) Done() <-chan struct{} {
panic("implement me")
}

func (m mockedOutleter) OnEvent(event beat.Event) bool {
return m.onEventHandler(event)
}
107 changes: 107 additions & 0 deletions x-pack/filebeat/input/cometd/cometd_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration
// +build integration

package cometd

import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"

bay "github.com/elastic/bayeux"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type eventCaptor struct {
c chan struct{}
closeOnce sync.Once
closed bool
events chan beat.Event
}

func newEventCaptor(events chan beat.Event) channel.Outleter {
return &eventCaptor{
c: make(chan struct{}),
events: events,
}
}

func (ec *eventCaptor) OnEvent(event beat.Event) bool {
ec.events <- event
return true
}

func (ec *eventCaptor) Close() error {
ec.closeOnce.Do(func() {
ec.closed = true
close(ec.c)
})
return nil
}

func (ec *eventCaptor) Done() <-chan struct{} {
return ec.c
}

func TestInput(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cometd input", "cometd")) //nolint:errcheck // Bad linter! no need to test this.

// Setup the input config.
config := conf.MustNewConfigFrom(mapstr.M{
"channel_name": "channel_name1",
"auth.oauth2.client.id": "client.id",
"auth.oauth2.client.secret": "client.secret",
"auth.oauth2.user": "user",
"auth.oauth2.password": "password",
"auth.oauth2.token_url": "http://cometd:8080/token",
})

// Route input events through our captor instead of sending through ES.
eventsCh := make(chan beat.Event)
defer close(eventsCh)

captor := newEventCaptor(eventsCh)
defer captor.Close()

connector := channel.ConnectorFunc(func(_ *conf.C, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(captor), nil
})

// Mock the context.
inputContext := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Setup the input
input, err := NewInput(config, connector, inputContext)
require.NoError(t, err)
require.NotNil(t, input)

var msg bay.MaybeMsg
msg.Msg.Data.Event.ReplayID = 1234
msg.Msg.Data.Payload = []byte(`{"CountryIso": "IN"}`)
msg.Msg.Channel = "channel_name1"

// Run the input.
input.Run()

event := <-eventsCh

val, err := event.GetValue("message")
require.NoError(t, err)
require.Equal(t, string(msg.Msg.Data.Payload), val)

input.Stop()
}
25 changes: 25 additions & 0 deletions x-pack/filebeat/input/cometd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd

import "fmt"

type config struct {
ChannelName string `config:"channel_name" validate:"required"`
Auth *authConfig `config:"auth"`
}

func (c *config) Validate() error {
if c.ChannelName == "" {
return fmt.Errorf("no channel name was configured or detected")
}
return nil
}

func defaultConfig() config {
var c config
c.ChannelName = "cometd-channel"
return c
}
Loading

0 comments on commit 29c0d4c

Please sign in to comment.