Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][CometD] Add CometD input for Salesforce connector #31492

Merged
merged 69 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
199e27f
Add authentication mechanism for cometd input for Salesforce connector
kush-elastic Nov 26, 2021
0067633
update based on comments: clear empty line and add error.Errorf with …
kush-elastic Dec 1, 2021
321463b
Add data collection mechanism for Salesforce cometd input
yug-crest Dec 8, 2021
9a877c2
Merge branch 'salesforce_observability_connector' of https://github.c…
yug-crest Dec 8, 2021
32a1790
Update the approach of the channel
yug-crest Dec 16, 2021
3855505
Add asciidoc for cometd input
yug-crest Dec 16, 2021
6e585ad
Resolve review comments
yug-crest Dec 24, 2021
d67702d
Update cometd input to import forked dependency
yug-crest Jan 13, 2022
8a24338
Temorarily remove changelog entry
yug-rajani Jan 28, 2022
ab2a42f
Run mage check and update
yug-rajani Jan 28, 2022
958c51c
Update go.mod and go.sum
yug-rajani Jan 30, 2022
7ff87d4
Update go.mod and NOTICE.txt
yug-rajani Feb 1, 2022
b673e08
Fetch upstream and resolve merge conflicts
yug-rajani Mar 16, 2022
c658cd8
Make changes with respect to ownership transfer
yug-rajani Mar 16, 2022
c05d8ba
Update go.mod and go.sum
yug-rajani Mar 16, 2022
798142b
Add two unit test cases and nit
yug-rajani Mar 25, 2022
a3fe48b
Add unit test cases and integration tests
yug-rajani Mar 27, 2022
dbcc5ee
Sync with master and resolve conflicts
yug-rajani Mar 28, 2022
70b86cd
Merge branch 'elastic-main' into salesforce_cometd_input
yug-rajani Mar 28, 2022
497d16d
Address review comments
yug-rajani Apr 1, 2022
f7135cc
Resolve some linting issues
yug-rajani Apr 4, 2022
76268f0
Resolve some linting issues
yug-rajani Apr 5, 2022
d114bc4
Add unit test for input
kush-elastic Apr 5, 2022
a17f261
Add integration test
kush-elastic Apr 6, 2022
48c335c
Lint issues
kush-elastic Apr 6, 2022
4bc6bb8
Lint
kush-elastic Apr 6, 2022
5300d56
Lint
kush-elastic Apr 6, 2022
2af933f
update bayeux v1.0.3
kush-elastic Apr 6, 2022
a58177f
Add NOTICE.txt
kush-elastic Apr 6, 2022
ce5f454
Add negative test cases
yug-rajani Apr 7, 2022
7a26efc
requested changes
kush-elastic Apr 13, 2022
abd6b70
Resolve CI lint errors
kush-elastic Apr 13, 2022
1024be9
Update go.sum
kush-elastic Apr 13, 2022
d8940a9
Update NOTICE.txt
kush-elastic Apr 13, 2022
efa7df0
Update unit tests
kush-elastic Apr 13, 2022
b03af9f
Update unit test name
kush-elastic Apr 13, 2022
ea1cd76
Update tests
kush-elastic Apr 14, 2022
73c028f
Remove test.out
kush-elastic Apr 14, 2022
0f337e1
Use time.After to eliminate possible flaky test
cmacknz Apr 14, 2022
14d43fe
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani Apr 18, 2022
14f07a2
Add buffer of 1 in message channel
kush-elastic Apr 19, 2022
4d7e486
Update tests based on bayeux client status watcher changes
kush-elastic Apr 19, 2022
292bd7e
incorporate bayeux changes
kush-elastic Apr 21, 2022
09f81e6
Update unit tests
kush-elastic Apr 21, 2022
80021a0
resolve unit tests
kush-elastic Apr 22, 2022
b5cee66
additional tests with multiple inputs, added channel_name filed in co…
kush-elastic Apr 22, 2022
94663b6
nit: Event struct -> event struct
kush-elastic Apr 26, 2022
ab365e8
test case improvement
kush-elastic Apr 26, 2022
c1a80a6
update looger Error -> Errorw to print formatted errors
kush-elastic Apr 27, 2022
7ce0886
update docs with reference links
kush-elastic Apr 27, 2022
66dbbfe
Add CHANGELOG entry and update docs
kush-elastic Apr 28, 2022
261991e
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani Apr 29, 2022
6d5aaa8
Use mapstr.M instead of common.MapStr
kush-elastic Apr 29, 2022
c893169
Merge branch 'elastic:main' into salesforce_cometd_input
kush-elastic Apr 29, 2022
3dba2fb
Changes based on Replace common.Config and friends with config.C
kush-elastic Apr 29, 2022
e427fed
resolve make check error
kush-elastic Apr 29, 2022
f13db12
resolve comman config errors
kush-elastic Apr 29, 2022
a4a02c4
Merge branch 'main' into salesforce_cometd_input
kush-elastic Apr 29, 2022
eac0644
Merge branch 'main' into salesforce_cometd_input
kush-elastic May 2, 2022
99d8c0b
update unit tests
kush-elastic May 3, 2022
e1ad7db
update multiple input unit test
kush-elastic May 3, 2022
b79b041
Update TestMultiInput unit test
kush-elastic May 3, 2022
163b688
Update minor nit to re-trigger CI
kush-elastic May 3, 2022
a14fa10
Resolve flanky tests
kush-elastic May 3, 2022
33a9efa
Resolve linting issues
kush-elastic May 3, 2022
ec0bd3e
Resolve tests -> send empty response {}
kush-elastic May 5, 2022
9ff0ff4
nit: inputType -> expectedHTTPEventCount
kush-elastic May 5, 2022
7d79299
nit: wg -> waitForEventCollection
kush-elastic May 5, 2022
e04b725
Merge branch 'elastic:main' into salesforce_cometd_input
yug-rajani May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/cometd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (in *cometdInput) run() error {
for e := range in.msgCh {
if e.Failed() {
return fmt.Errorf("error collecting events: %w", e.Err)
}
if !e.Msg.Successful {
} else if !e.Msg.Successful {
var event event
// To handle the last response where the object received was empty
if e.Msg.Data.Payload == nil {
Expand Down
78 changes: 23 additions & 55 deletions x-pack/filebeat/input/cometd/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

const (
firstChannel = "channel_name1"
secondChannel = "channel_name2"
)

var (
serverURL string
inputType int
called uint64
)

func TestInputDone(t *testing.T) {
config := mapstr.M{
"channel_name": firstChannel,
"channel_name": "channel_channel",
"auth.oauth2.client.id": "DEMOCLIENTID",
"auth.oauth2.client.secret": "DEMOCLIENTSECRET",
"auth.oauth2.user": "salesforce_user",
Expand Down Expand Up @@ -68,6 +64,7 @@ func TestMakeEventFailure(t *testing.T) {
}

func TestSingleInput(t *testing.T) {
inputType = 1
defer atomic.StoreUint64(&called, 0)
eventsCh := make(chan beat.Event)
defer close(eventsCh)
Expand Down Expand Up @@ -96,7 +93,7 @@ func TestSingleInput(t *testing.T) {
"auth.oauth2.password": "password",
}

r := http.HandlerFunc(oauth2SingleEventHandler)
r := http.HandlerFunc(oauth2Handler)
server := httptest.NewServer(r)
defer server.Close()

Expand All @@ -117,6 +114,7 @@ func TestSingleInput(t *testing.T) {
}

func TestInputStop_Wait(t *testing.T) {
inputType = 1
defer atomic.StoreUint64(&called, 0)
eventsCh := make(chan beat.Event)
defer close(eventsCh)
Expand Down Expand Up @@ -151,7 +149,7 @@ func TestInputStop_Wait(t *testing.T) {
"auth.oauth2.password": "password",
}

r := http.HandlerFunc(oauth2SingleEventHandler)
r := http.HandlerFunc(oauth2Handler)
server := httptest.NewServer(r)
defer server.Close()

Expand All @@ -170,20 +168,22 @@ func TestInputStop_Wait(t *testing.T) {
require.Equal(t, 1, bay.GetConnectedCount())

var wg sync.WaitGroup
var waitForConnections sync.WaitGroup
wg.Add(1)
waitForConnections.Add(1)
go func() {
require.Equal(t, 1, bay.GetConnectedCount()) // current open channels count should be 1
event := <-eventsCh
assertEventMatches(t, expected, event) // wait for single event
wg.Done()
time.Sleep(100 * time.Millisecond) // let input.Stop() be executed.
require.Equal(t, 0, bay.GetConnectedCount()) // current open channels count should be 0
waitForConnections.Done()
kush-elastic marked this conversation as resolved.
Show resolved Hide resolved
}()

time.Sleep(100 * time.Millisecond) // let input.Stop() be executed.

wg.Wait()
input.Wait()
waitForConnections.Wait()
}

func TestStop(t *testing.T) {
Expand Down Expand Up @@ -241,6 +241,7 @@ func TestWait(t *testing.T) {
}

func TestMultiInput(t *testing.T) {
inputType = 2
defer atomic.StoreUint64(&called, 0)
eventsCh := make(chan beat.Event)
defer close(eventsCh)
Expand Down Expand Up @@ -291,63 +292,28 @@ func TestMultiInput(t *testing.T) {
require.NotNil(t, input2)

require.Equal(t, 0, bay.GetConnectedCount())
// run input
input1.Run()
defer input1.Stop()

// run input
input2.Run()
defer input2.Stop()

got := 0
go func() {
fmt.Println("start")
// run input
input1.Run()
defer input1.Stop()

// run input
input2.Run()
defer input2.Stop()

for _, event := range []beat.Event{<-eventsCh, <-eventsCh} {
assertEventMatches(t, expected1, event)
got++
fmt.Println(got)
}
}()
time.Sleep(2 * time.Second)
time.Sleep(time.Second)
if got < 2 {
require.NoError(t, fmt.Errorf("not able to get events."))
}
}

func oauth2SingleEventHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
_ = r.ParseForm()
if r.URL.Path == "/token" {
response := `{"instance_url": "` + serverURL + `", "expires_in": "60", "access_token": "abcd"}`
_, _ = w.Write([]byte(response))
return
}
body, _ := ioutil.ReadAll(r.Body)

switch string(body) {
case `{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}`:
_, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`))
return
case `{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "client_id"} `:
if called < 1 {
atomic.AddUint64(&called, 1)
_, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`))
}
return
case `{
"channel": "/meta/subscribe",
"subscription": "channel_name",
"clientId": "client_id",
"ext": {
"replay": {"channel_name": "-1"}
}
}`:
_, _ = w.Write([]byte(`[{"clientId": "client_id", "channel": "/meta/subscribe", "subscription": "channel_name", "successful":true}]`))
return
default:
}
}

func oauth2Handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
_ = r.ParseForm()
Expand All @@ -363,9 +329,11 @@ func oauth2Handler(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`))
return
case `{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "client_id"} `:
if called < 2 {
if called < uint64(inputType) {
kush-elastic marked this conversation as resolved.
Show resolved Hide resolved
atomic.AddUint64(&called, 1)
_, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`))
} else {
_, _ = w.Write([]byte(`{}`))
}
return
case `{
Expand Down