Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][CometD] Add CometD input for Salesforce connector #30089

Merged
merged 59 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
199e27f
Add authentication mechanism for cometd input for Salesforce connector
kush-elastic Nov 26, 2021
0067633
update based on comments: clear empty line and add error.Errorf with …
kush-elastic Dec 1, 2021
321463b
Add data collection mechanism for Salesforce cometd input
yug-crest Dec 8, 2021
9a877c2
Merge branch 'salesforce_observability_connector' of https://github.c…
yug-crest Dec 8, 2021
32a1790
Update the approach of the channel
yug-crest Dec 16, 2021
3855505
Add asciidoc for cometd input
yug-crest Dec 16, 2021
6e585ad
Resolve review comments
yug-crest Dec 24, 2021
d67702d
Update cometd input to import forked dependency
yug-crest Jan 13, 2022
8a24338
Temorarily remove changelog entry
yug-rajani Jan 28, 2022
ab2a42f
Run mage check and update
yug-rajani Jan 28, 2022
958c51c
Update go.mod and go.sum
yug-rajani Jan 30, 2022
7ff87d4
Update go.mod and NOTICE.txt
yug-rajani Feb 1, 2022
b673e08
Fetch upstream and resolve merge conflicts
yug-rajani Mar 16, 2022
c658cd8
Make changes with respect to ownership transfer
yug-rajani Mar 16, 2022
c05d8ba
Update go.mod and go.sum
yug-rajani Mar 16, 2022
798142b
Add two unit test cases and nit
yug-rajani Mar 25, 2022
a3fe48b
Add unit test cases and integration tests
yug-rajani Mar 27, 2022
dbcc5ee
Sync with master and resolve conflicts
yug-rajani Mar 28, 2022
70b86cd
Merge branch 'elastic-main' into salesforce_cometd_input
yug-rajani Mar 28, 2022
497d16d
Address review comments
yug-rajani Apr 1, 2022
f7135cc
Resolve some linting issues
yug-rajani Apr 4, 2022
76268f0
Resolve some linting issues
yug-rajani Apr 5, 2022
d114bc4
Add unit test for input
kush-elastic Apr 5, 2022
a17f261
Add integration test
kush-elastic Apr 6, 2022
48c335c
Lint issues
kush-elastic Apr 6, 2022
4bc6bb8
Lint
kush-elastic Apr 6, 2022
5300d56
Lint
kush-elastic Apr 6, 2022
2af933f
update bayeux v1.0.3
kush-elastic Apr 6, 2022
a58177f
Add NOTICE.txt
kush-elastic Apr 6, 2022
ce5f454
Add negative test cases
yug-rajani Apr 7, 2022
7a26efc
requested changes
kush-elastic Apr 13, 2022
abd6b70
Resolve CI lint errors
kush-elastic Apr 13, 2022
1024be9
Update go.sum
kush-elastic Apr 13, 2022
d8940a9
Update NOTICE.txt
kush-elastic Apr 13, 2022
efa7df0
Update unit tests
kush-elastic Apr 13, 2022
b03af9f
Update unit test name
kush-elastic Apr 13, 2022
ea1cd76
Update tests
kush-elastic Apr 14, 2022
73c028f
Remove test.out
kush-elastic Apr 14, 2022
0f337e1
Use time.After to eliminate possible flaky test
cmacknz Apr 14, 2022
14d43fe
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani Apr 18, 2022
14f07a2
Add buffer of 1 in message channel
kush-elastic Apr 19, 2022
4d7e486
Update tests based on bayeux client status watcher changes
kush-elastic Apr 19, 2022
292bd7e
incorporate bayeux changes
kush-elastic Apr 21, 2022
09f81e6
Update unit tests
kush-elastic Apr 21, 2022
80021a0
resolve unit tests
kush-elastic Apr 22, 2022
b5cee66
additional tests with multiple inputs, added channel_name filed in co…
kush-elastic Apr 22, 2022
94663b6
nit: Event struct -> event struct
kush-elastic Apr 26, 2022
ab365e8
test case improvement
kush-elastic Apr 26, 2022
c1a80a6
update looger Error -> Errorw to print formatted errors
kush-elastic Apr 27, 2022
7ce0886
update docs with reference links
kush-elastic Apr 27, 2022
66dbbfe
Add CHANGELOG entry and update docs
kush-elastic Apr 28, 2022
261991e
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani Apr 29, 2022
6d5aaa8
Use mapstr.M instead of common.MapStr
kush-elastic Apr 29, 2022
c893169
Merge branch 'elastic:main' into salesforce_cometd_input
kush-elastic Apr 29, 2022
3dba2fb
Changes based on Replace common.Config and friends with config.C
kush-elastic Apr 29, 2022
e427fed
resolve make check error
kush-elastic Apr 29, 2022
f13db12
resolve comman config errors
kush-elastic Apr 29, 2022
a4a02c4
Merge branch 'main' into salesforce_cometd_input
kush-elastic Apr 29, 2022
eac0644
Merge branch 'main' into salesforce_cometd_input
kush-elastic May 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5784,6 +5784,7 @@ https://github.com/elastic/beats/compare/v6.2.3\...v6.3.0[View commits]
- Add support human friendly size for the UDP input. {pull}6886[6886]
- Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Add CometD input {pull}30089[30089]

*Heartbeat*

Expand Down
31 changes: 31 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5805,6 +5805,37 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND



--------------------------------------------------------------------------------
Dependency : github.com/elastic/bayeux
Version: v1.0.5
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

MIT License

Copyright (c) 2017 Zander Hill

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.0-20210727140539-f0905d9377f6
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-aws-s3>>
* <<{beatname_lc}-input-azure-eventhub>>
* <<{beatname_lc}-input-cloudfoundry>>
* <<{beatname_lc}-input-cometd>>
* <<{beatname_lc}-input-container>>
* <<{beatname_lc}-input-filestream>>
* <<{beatname_lc}-input-gcp-pubsub>>
Expand Down Expand Up @@ -94,6 +95,8 @@ include::../../x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-cometd.asciidoc[]

include::inputs/input-container.asciidoc[]

include::inputs/input-filestream.asciidoc[]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ require (
)

require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-libs v0.1.1
github.com/shirou/gopsutil/v3 v3.21.12
go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0=
github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs=
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1:nFvXHBjYK3e9+xF0WKDeAKK4aOO51uC28s+L9rBmilo=
Expand Down
8 changes: 8 additions & 0 deletions testing/environments/docker/cometd/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM docker.elastic.co/observability/stream:v0.6.1

RUN apt update && \
apt -y install curl
ENV PORT="8080"
COPY files /files
HEALTHCHECK --interval=1s --retries=600 CMD curl -X POST http://localhost:8080/token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably something that can be added to the stream. cc @andrewkroh

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint used here is only present due to the it running in http-server mode and is not generally applicable. But a general purpose healthcheck API could probably be added to provide feedback for some of the other output modes.

CMD [ "http-server", "--addr=:8080", "--config=/files/config.yml" ]
27 changes: 27 additions & 0 deletions testing/environments/docker/cometd/files/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
rules:
- path: /token
methods: ["POST"]
responses:
- status_code: 200
body: |
{"instance_url": "http://cometd:8080", "expires_in": "60", "access_token": "abcd"}
- path: /cometd/38.0
methods: ["POST"]
request_body: '{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}'
responses:
- status_code: 200
body: |
[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"94b112sp7ph1c9s41mycpzik4rkj3","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]
- path: /cometd/38.0
methods: ["POST"]
request_body: '{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "94b112sp7ph1c9s41mycpzik4rkj3"} '
responses:
- status_code: 200
body: |
[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "/event/LoginEventStream"}]
- path: /cometd/38.0
methods: ["POST"]
responses:
- status_code: 200
body: |
[{"clientId": "94b112sp7ph1c9s41mycpzik4rkj3", "channel": "/meta/subscribe", "subscription": "/event/LoginEventStream", "successful":true}]
6 changes: 6 additions & 0 deletions x-pack/filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
cometd: { condition: service_healthy }

elasticsearch:
extends:
Expand All @@ -33,3 +34,8 @@ services:
retries: 300
interval: 1s

cometd:
build: ${ES_BEATS}/testing/environments/docker/cometd
hostname: cometd
ports:
- 8080:8080
65 changes: 65 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cometd.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
[role="xpack"]

:type: cometd

[id="{beatname_lc}-input-{type}"]
=== CometD input

++++
<titleabbrev>CometD</titleabbrev>
++++

Use the `https://docs.cometd.org/[cometd]` input to stream the real-time events from a https://resources.docs.salesforce.com/sfdc/pdf/api_streaming.pdf[Salesforce generic subscription Push Topic].

This input can, for example, be used to receive Login and Logout events that are generated when users log in or out of the Salesforce instance.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: cometd
channel_name: /event/MyEventStream
auth.oauth2:
client.id: my-client-id
client.secret: my-client-secret
token_url: https://login.salesforce.com/services/oauth2/token
user: [email protected]
password: my-password
----

==== Configuration options

The `cometd` input supports the following configuration options.

[float]
==== `channel_name`

Salesforce generic subscription Push Topic name. Required.

[float]
==== `auth.oauth2.client.id`

The client ID used as part of the authentication flow. Required.

[float]
==== `auth.oauth2.client.secret`

The client secret used as part of the authentication flow. Required.

[float]
==== `auth.oauth2.token_url`

The endpoint that will be used to generate the token during the oauth2 flow. Required.

[float]
==== `auth.oauth2.user`

The user used as part of the authentication flow. It is required for authentication - grant type password. Required.

[float]
==== `auth.oauth2.password`

The password used as part of the authentication flow. It is required for authentication - grant type password. Required.

:type!:
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions x-pack/filebeat/input/cometd/client_mocked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd

import (
"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

type mockedConnector struct {
connectWithError error
outlet channel.Outleter
}

var _ channel.Connector = new(mockedConnector)

func (m *mockedConnector) Connect(c *common.Config) (channel.Outleter, error) {
return m.ConnectWith(c, beat.ClientConfig{})
}

func (m *mockedConnector) ConnectWith(*common.Config, beat.ClientConfig) (channel.Outleter, error) {
if m.connectWithError != nil {
return nil, m.connectWithError
}
return m.outlet, nil
}

type mockedOutleter struct {
onEventHandler func(event beat.Event) bool
}

var _ channel.Outleter = new(mockedOutleter)

func (m mockedOutleter) Close() error {
panic("implement me")
}

func (m mockedOutleter) Done() <-chan struct{} {
panic("implement me")
}

func (m mockedOutleter) OnEvent(event beat.Event) bool {
return m.onEventHandler(event)
}
106 changes: 106 additions & 0 deletions x-pack/filebeat/input/cometd/cometd_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

cmacknz marked this conversation as resolved.
Show resolved Hide resolved
//go:build integration
// +build integration

package cometd

import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"

bay "github.com/elastic/bayeux"
)

type eventCaptor struct {
c chan struct{}
closeOnce sync.Once
closed bool
events chan beat.Event
}

func newEventCaptor(events chan beat.Event) channel.Outleter {
return &eventCaptor{
c: make(chan struct{}),
events: events,
}
}

func (ec *eventCaptor) OnEvent(event beat.Event) bool {
ec.events <- event
return true
}

func (ec *eventCaptor) Close() error {
ec.closeOnce.Do(func() {
ec.closed = true
close(ec.c)
})
return nil
}

func (ec *eventCaptor) Done() <-chan struct{} {
return ec.c
}

func TestInput(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cometd input", "cometd")) //nolint:errcheck // Bad linter! no need to test this.

// Setup the input config.
config := common.MustNewConfigFrom(common.MapStr{
"channel_name": "channel_name1",
"auth.oauth2.client.id": "client.id",
"auth.oauth2.client.secret": "client.secret",
"auth.oauth2.user": "user",
"auth.oauth2.password": "password",
"auth.oauth2.token_url": "http://cometd:8080/token",
})

// Route input events through our captor instead of sending through ES.
eventsCh := make(chan beat.Event)
defer close(eventsCh)

captor := newEventCaptor(eventsCh)
defer captor.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(captor), nil
})

// Mock the context.
inputContext := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Setup the input
input, err := NewInput(config, connector, inputContext)
require.NoError(t, err)
require.NotNil(t, input)

var msg bay.MaybeMsg
msg.Msg.Data.Event.ReplayID = 1234
msg.Msg.Data.Payload = []byte(`{"CountryIso": "IN"}`)
msg.Msg.Channel = "channel_name1"

// Run the input.
input.Run()

event := <-eventsCh

val, err := event.GetValue("message")
require.NoError(t, err)
require.Equal(t, string(msg.Msg.Data.Payload), val)

input.Stop()
}
25 changes: 25 additions & 0 deletions x-pack/filebeat/input/cometd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd

import "fmt"

type config struct {
ChannelName string `config:"channel_name" validate:"required"`
Auth *authConfig `config:"auth"`
}

func (c *config) Validate() error {
if c.ChannelName == "" {
return fmt.Errorf("no channel name was configured or detected")
}
return nil
}

func defaultConfig() config {
var c config
c.ChannelName = "cometd-channel"
return c
}
Loading