-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cometd input for Salesforce connector #1
Add cometd input for Salesforce connector #1
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comments
It would great to introduce an integration test. See MQTT input and mosquito server:
https://github.com/elastic/beats/blob/23e4403ae093fcc8f7905345cad2c7ad256976d8/filebeat/input/mqtt/mqtt_integration_test.go
https://github.com/elastic/beats/tree/23e4403ae093fcc8f7905345cad2c7ad256976d8/filebeat/input/mqtt
} | ||
|
||
func (in *cometdInput) newPubsubClient(ctx context.Context) (*http.Client, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's Done.
httpcommon.WithKeepaliveSettings{Disable: true}, | ||
) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use errors.Wrap
with context everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with: error.Errorf
with context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you want to keep it here to accumulate also other PRs. What would be the cut-off point, I mean, when do you plan to merge this PR?
in.workerWg.Add(1) | ||
go func() { | ||
in.log.Info("Pub/Sub input worker has started.") | ||
defer in.log.Info("Pub/Sub input worker has stopped.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just double-checking, is the order of these defers important?
|
||
// For testing http client | ||
// Start of the test code for authentication and client creation | ||
baseUrl := "<put base URL here>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to add the baseUrl
to the configuration?
httpcommon.WithKeepaliveSettings{Disable: true}, | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("cometd client: error on newHTTPClient: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a typo in comment? newHTTPClient
}) | ||
} | ||
|
||
// Stop stops the pubsub input and waits for it to fully stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIR It should be the other way round: Stop
just stops the logic, but doesn't wait, but Wait
can call the Stop
and wait until all processing is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, you're mixing the CometD/Bayeux concept and the Salesforce implementation. It doesn't mean that every CometD server requires authorization (any, not just OAuth) or it's Salesforce :) Please take a look at the httpjson
implementation to see how it interacts with different auth providers. The other way would be implementing the input as salesforce-cometd
, but I see the value in having a generic cometd
input, so let's try to make it generic.
Major issues to be adjusted:
- The input must support the graceful shutdown.
- Input tests are missing. All paths should be covered.
- Make the input as stateless as possible. Do not create resources while the input is running if it isn't necessary. It will impact the overall performance.
As this PR include the Filebeat input, please invite also Tiago.
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package cometd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire file is missing unit tests. It's a must-have to cover all code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have covered the base unit test case for the creation of the new input using the NewInput function in input_test.go
. We're unsure how we can write the unit test cases for other functions and paths the code falls into, so we'll check that out.
select { | ||
case e := <-in.out: | ||
if !e.Successful { | ||
if e.Data.Object == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment please, why e.Data.Object
can be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the appropriate comment, thanks!
if err != nil { | ||
return fmt.Errorf("JSON error: %v", err) | ||
} | ||
err = json.Unmarshal(e.Data.Object, &event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment please, why json.Marshal, then json.Marshal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the appropriate comment, thanks!
type Status struct { | ||
channels []string | ||
clientID string | ||
connected bool | ||
} | ||
|
||
type BayeuxHandshake []struct { | ||
ClientID string `json:"clientId"` | ||
Channel string `json:"channel"` | ||
Ext struct { | ||
Replay bool `json:"replay"` | ||
} `json:"ext"` | ||
MinimumVersion string `json:"minimumVersion"` | ||
Successful bool `json:"successful"` | ||
SupportedConnectionTypes []string `json:"supportedConnectionTypes"` | ||
Version string `json:"version"` | ||
} | ||
|
||
type Subscription struct { | ||
ClientID string `json:"clientId"` | ||
Channel string `json:"channel"` | ||
Subscription string `json:"subscription"` | ||
Successful bool `json:"successful"` | ||
} | ||
|
||
type Credentials struct { | ||
AccessToken string `json:"access_token"` | ||
InstanceURL string `json:"instance_url"` | ||
IssuedAt string `json:"issued_at"` | ||
ID string `json:"id"` | ||
TokenType string `json:"token_type"` | ||
Signature string `json:"signature"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these values need to be exposed (Status, BayeuxHandshake, Subscription, Credentials)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it.
req.Header.Add("Content-Type", "application/json") | ||
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken)) | ||
// Passing back cookies is required though undocumented in Salesforce API | ||
// We were unable to get process working without passing cookies back to SF server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can strip this line from here :)
return sub, nil | ||
} | ||
|
||
func (b *Bayeux) connect(out chan TriggerEvent, log *logp.Logger) (chan TriggerEvent, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goroutine never finishes. Please adjust the whole input, so it can be gracefully shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are closing the channel before the input shuts down, so I think it is shutting down gracefully.
"client_secret": {o.ClientSecret}, | ||
"username": {o.User}, | ||
"password": {o.Password}} | ||
res, err := http.PostForm(route, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: response
) | ||
|
||
var ( | ||
status = Status{[]string{}, "", false} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this? why is it global? What will happen if there are multiple configs enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It is used for storing all the channels that are being subscribed and the client IDs
- It has been declared as global in order to store multiple channels
- The channel names from all the enabled configs would be stored there
return nil, fmt.Errorf("bad Call request: %v", err) | ||
} | ||
req.Header.Add("Content-Type", "application/json") | ||
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on the implementation, Salesforce oriented or general, this can be optional, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not quite sure about this one.
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package cometd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced about keeping all the logic in a single file, I think you have to adjust it based on the SoC principle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we tried following the gcppubsub
and that's why we kept it all in a single file.
We have tried to make this input generic but at some places like authorization, we had to go Salesforce specific due to the time constraint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the block, review from the team pending.
Raised a new PR to elastic:master branch. Please refer the same and ignore this one. |
Closing this PR as we have raised a PR in the elastic/beats repo. |
What does this PR do?
Real-time logs from the Salesforce Streaming API can be collected using the cometD input. For example, live Login & Logout related logs.
Right now with this PR, when the cometd input is enabled, you can start seeing the authentication log messages that are retrieved after successful authentication using the credentials provided as a part of the configuration. You will also be able to see the real-time data being collected from the Salesforce Steaming API.
For configuration:
Why is it important?
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.