-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Filebeat][CometD] Add CometD input for Salesforce connector #30089
Merged
Merged
Changes from 51 commits
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
199e27f
Add authentication mechanism for cometd input for Salesforce connector
kush-elastic 0067633
update based on comments: clear empty line and add error.Errorf with …
kush-elastic 321463b
Add data collection mechanism for Salesforce cometd input
yug-crest 9a877c2
Merge branch 'salesforce_observability_connector' of https://github.c…
yug-crest 32a1790
Update the approach of the channel
yug-crest 3855505
Add asciidoc for cometd input
yug-crest 6e585ad
Resolve review comments
yug-crest d67702d
Update cometd input to import forked dependency
yug-crest 8a24338
Temorarily remove changelog entry
yug-rajani ab2a42f
Run mage check and update
yug-rajani 958c51c
Update go.mod and go.sum
yug-rajani 7ff87d4
Update go.mod and NOTICE.txt
yug-rajani b673e08
Fetch upstream and resolve merge conflicts
yug-rajani c658cd8
Make changes with respect to ownership transfer
yug-rajani c05d8ba
Update go.mod and go.sum
yug-rajani 798142b
Add two unit test cases and nit
yug-rajani a3fe48b
Add unit test cases and integration tests
yug-rajani dbcc5ee
Sync with master and resolve conflicts
yug-rajani 70b86cd
Merge branch 'elastic-main' into salesforce_cometd_input
yug-rajani 497d16d
Address review comments
yug-rajani f7135cc
Resolve some linting issues
yug-rajani 76268f0
Resolve some linting issues
yug-rajani d114bc4
Add unit test for input
kush-elastic a17f261
Add integration test
kush-elastic 48c335c
Lint issues
kush-elastic 4bc6bb8
Lint
kush-elastic 5300d56
Lint
kush-elastic 2af933f
update bayeux v1.0.3
kush-elastic a58177f
Add NOTICE.txt
kush-elastic ce5f454
Add negative test cases
yug-rajani 7a26efc
requested changes
kush-elastic abd6b70
Resolve CI lint errors
kush-elastic 1024be9
Update go.sum
kush-elastic d8940a9
Update NOTICE.txt
kush-elastic efa7df0
Update unit tests
kush-elastic b03af9f
Update unit test name
kush-elastic ea1cd76
Update tests
kush-elastic 73c028f
Remove test.out
kush-elastic 0f337e1
Use time.After to eliminate possible flaky test
cmacknz 14d43fe
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani 14f07a2
Add buffer of 1 in message channel
kush-elastic 4d7e486
Update tests based on bayeux client status watcher changes
kush-elastic 292bd7e
incorporate bayeux changes
kush-elastic 09f81e6
Update unit tests
kush-elastic 80021a0
resolve unit tests
kush-elastic b5cee66
additional tests with multiple inputs, added channel_name filed in co…
kush-elastic 94663b6
nit: Event struct -> event struct
kush-elastic ab365e8
test case improvement
kush-elastic c1a80a6
update looger Error -> Errorw to print formatted errors
kush-elastic 7ce0886
update docs with reference links
kush-elastic 66dbbfe
Add CHANGELOG entry and update docs
kush-elastic 261991e
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani 6d5aaa8
Use mapstr.M instead of common.MapStr
kush-elastic c893169
Merge branch 'elastic:main' into salesforce_cometd_input
kush-elastic 3dba2fb
Changes based on Replace common.Config and friends with config.C
kush-elastic e427fed
resolve make check error
kush-elastic f13db12
resolve comman config errors
kush-elastic a4a02c4
Merge branch 'main' into salesforce_cometd_input
kush-elastic eac0644
Merge branch 'main' into salesforce_cometd_input
kush-elastic File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5805,6 +5805,37 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
|
||
|
||
|
||
-------------------------------------------------------------------------------- | ||
Dependency : github.com/elastic/bayeux | ||
Version: v1.0.5 | ||
Licence type (autodetected): MIT | ||
-------------------------------------------------------------------------------- | ||
|
||
Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE: | ||
|
||
MIT License | ||
|
||
Copyright (c) 2017 Zander Hill | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. | ||
|
||
|
||
-------------------------------------------------------------------------------- | ||
Dependency : github.com/elastic/elastic-agent-client/v7 | ||
Version: v7.0.0-20210727140539-f0905d9377f6 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
FROM docker.elastic.co/observability/stream:v0.6.1 | ||
|
||
RUN apt update && \ | ||
apt -y install curl | ||
ENV PORT="8080" | ||
COPY files /files | ||
HEALTHCHECK --interval=1s --retries=600 CMD curl -X POST http://localhost:8080/token | ||
CMD [ "http-server", "--addr=:8080", "--config=/files/config.yml" ] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
rules: | ||
- path: /token | ||
methods: ["POST"] | ||
responses: | ||
- status_code: 200 | ||
body: | | ||
{"instance_url": "http://cometd:8080", "expires_in": "60", "access_token": "abcd"} | ||
- path: /cometd/38.0 | ||
methods: ["POST"] | ||
request_body: '{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}' | ||
responses: | ||
- status_code: 200 | ||
body: | | ||
[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"94b112sp7ph1c9s41mycpzik4rkj3","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}] | ||
- path: /cometd/38.0 | ||
methods: ["POST"] | ||
request_body: '{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "94b112sp7ph1c9s41mycpzik4rkj3"} ' | ||
responses: | ||
- status_code: 200 | ||
body: | | ||
[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "/event/LoginEventStream"}] | ||
- path: /cometd/38.0 | ||
methods: ["POST"] | ||
responses: | ||
- status_code: 200 | ||
body: | | ||
[{"clientId": "94b112sp7ph1c9s41mycpzik4rkj3", "channel": "/meta/subscribe", "subscription": "/event/LoginEventStream", "successful":true}] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
[role="xpack"] | ||
|
||
:type: cometd | ||
|
||
[id="{beatname_lc}-input-{type}"] | ||
=== CometD input | ||
|
||
++++ | ||
<titleabbrev>CometD</titleabbrev> | ||
++++ | ||
|
||
Use the `https://docs.cometd.org/[cometd]` input to stream the real-time events from a https://resources.docs.salesforce.com/sfdc/pdf/api_streaming.pdf[Salesforce generic subscription Push Topic]. | ||
|
||
This input can, for example, be used to receive Login and Logout events that are generated when users log in or out of the Salesforce instance. | ||
|
||
Example configuration: | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
{beatname_lc}.inputs: | ||
- type: cometd | ||
channel_name: /event/MyEventStream | ||
auth.oauth2: | ||
client.id: my-client-id | ||
client.secret: my-client-secret | ||
token_url: https://login.salesforce.com/services/oauth2/token | ||
user: [email protected] | ||
password: my-password | ||
---- | ||
|
||
==== Configuration options | ||
|
||
The `cometd` input supports the following configuration options. | ||
|
||
[float] | ||
==== `channel_name` | ||
|
||
Salesforce generic subscription Push Topic name. Required. | ||
|
||
[float] | ||
==== `auth.oauth2.client.id` | ||
|
||
The client ID used as part of the authentication flow. Required. | ||
|
||
[float] | ||
==== `auth.oauth2.client.secret` | ||
|
||
The client secret used as part of the authentication flow. Required. | ||
|
||
[float] | ||
==== `auth.oauth2.token_url` | ||
|
||
The endpoint that will be used to generate the token during the oauth2 flow. Required. | ||
|
||
[float] | ||
==== `auth.oauth2.user` | ||
|
||
The user used as part of the authentication flow. It is required for authentication - grant type password. Required. | ||
|
||
[float] | ||
==== `auth.oauth2.password` | ||
|
||
The password used as part of the authentication flow. It is required for authentication - grant type password. Required. | ||
|
||
:type!: |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package cometd | ||
|
||
import ( | ||
"github.com/elastic/beats/v7/filebeat/channel" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
) | ||
|
||
type mockedConnector struct { | ||
connectWithError error | ||
outlet channel.Outleter | ||
} | ||
|
||
var _ channel.Connector = new(mockedConnector) | ||
|
||
func (m *mockedConnector) Connect(c *common.Config) (channel.Outleter, error) { | ||
return m.ConnectWith(c, beat.ClientConfig{}) | ||
} | ||
|
||
func (m *mockedConnector) ConnectWith(*common.Config, beat.ClientConfig) (channel.Outleter, error) { | ||
if m.connectWithError != nil { | ||
return nil, m.connectWithError | ||
} | ||
return m.outlet, nil | ||
} | ||
|
||
type mockedOutleter struct { | ||
onEventHandler func(event beat.Event) bool | ||
} | ||
|
||
var _ channel.Outleter = new(mockedOutleter) | ||
|
||
func (m mockedOutleter) Close() error { | ||
panic("implement me") | ||
} | ||
|
||
func (m mockedOutleter) Done() <-chan struct{} { | ||
panic("implement me") | ||
} | ||
|
||
func (m mockedOutleter) OnEvent(event beat.Event) bool { | ||
return m.onEventHandler(event) | ||
} |
106 changes: 106 additions & 0 deletions
106
x-pack/filebeat/input/cometd/cometd_integration_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
cmacknz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//go:build integration | ||
// +build integration | ||
|
||
package cometd | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/filebeat/channel" | ||
"github.com/elastic/beats/v7/filebeat/input" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
|
||
bay "github.com/elastic/bayeux" | ||
) | ||
|
||
type eventCaptor struct { | ||
c chan struct{} | ||
closeOnce sync.Once | ||
closed bool | ||
events chan beat.Event | ||
} | ||
|
||
func newEventCaptor(events chan beat.Event) channel.Outleter { | ||
return &eventCaptor{ | ||
c: make(chan struct{}), | ||
events: events, | ||
} | ||
} | ||
|
||
func (ec *eventCaptor) OnEvent(event beat.Event) bool { | ||
ec.events <- event | ||
return true | ||
} | ||
|
||
func (ec *eventCaptor) Close() error { | ||
ec.closeOnce.Do(func() { | ||
ec.closed = true | ||
close(ec.c) | ||
}) | ||
return nil | ||
} | ||
|
||
func (ec *eventCaptor) Done() <-chan struct{} { | ||
return ec.c | ||
} | ||
|
||
func TestInput(t *testing.T) { | ||
logp.TestingSetup(logp.WithSelectors("cometd input", "cometd")) //nolint:errcheck // Bad linter! no need to test this. | ||
|
||
// Setup the input config. | ||
config := common.MustNewConfigFrom(common.MapStr{ | ||
"channel_name": "channel_name1", | ||
"auth.oauth2.client.id": "client.id", | ||
"auth.oauth2.client.secret": "client.secret", | ||
"auth.oauth2.user": "user", | ||
"auth.oauth2.password": "password", | ||
"auth.oauth2.token_url": "http://cometd:8080/token", | ||
}) | ||
|
||
// Route input events through our captor instead of sending through ES. | ||
eventsCh := make(chan beat.Event) | ||
defer close(eventsCh) | ||
|
||
captor := newEventCaptor(eventsCh) | ||
defer captor.Close() | ||
|
||
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { | ||
return channel.SubOutlet(captor), nil | ||
}) | ||
|
||
// Mock the context. | ||
inputContext := input.Context{ | ||
Done: make(chan struct{}), | ||
BeatDone: make(chan struct{}), | ||
} | ||
|
||
// Setup the input | ||
input, err := NewInput(config, connector, inputContext) | ||
require.NoError(t, err) | ||
require.NotNil(t, input) | ||
|
||
var msg bay.MaybeMsg | ||
msg.Msg.Data.Event.ReplayID = 1234 | ||
msg.Msg.Data.Payload = []byte(`{"CountryIso": "IN"}`) | ||
msg.Msg.Channel = "channel_name1" | ||
|
||
// Run the input. | ||
input.Run() | ||
|
||
event := <-eventsCh | ||
|
||
val, err := event.GetValue("message") | ||
require.NoError(t, err) | ||
require.Equal(t, string(msg.Msg.Data.Payload), val) | ||
|
||
input.Stop() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package cometd | ||
|
||
import "fmt" | ||
|
||
type config struct { | ||
ChannelName string `config:"channel_name" validate:"required"` | ||
Auth *authConfig `config:"auth"` | ||
} | ||
|
||
func (c *config) Validate() error { | ||
if c.ChannelName == "" { | ||
return fmt.Errorf("no channel name was configured or detected") | ||
} | ||
return nil | ||
} | ||
|
||
func defaultConfig() config { | ||
var c config | ||
c.ChannelName = "cometd-channel" | ||
return c | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably something that can be added to the stream. cc @andrewkroh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The endpoint used here is only present due to the it running in http-server mode and is not generally applicable. But a general purpose healthcheck API could probably be added to provide feedback for some of the other output modes.