Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cometd input for Salesforce connector #1

Closed

Conversation

kush-elastic
Copy link
Owner

@kush-elastic kush-elastic commented Nov 26, 2021

  • Enhancement

What does this PR do?

Real-time logs from the Salesforce Streaming API can be collected using the cometD input. For example, live Login & Logout related logs.

Right now with this PR, when the cometd input is enabled, you can start seeing the authentication log messages that are retrieved after successful authentication using the credentials provided as a part of the configuration. You will also be able to see the real-time data being collected from the Salesforce Steaming API.
For configuration:

- type: cometd
  channel_name: /event/LoginEventStream
  auth.oauth2:
    client.id: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    client.secret: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    token_url: https://login.salesforce.com/services/oauth2/token
    user: [email protected]
    password: P@$$W0₹D

Why is it important?

  • It is a prerequisite for the Salesforce Observability Connector for the collection of real-time data from the Streaming APIs.
  • With all the different logs in cometD from different services, it will be good to have a dedicated Filebeat input to retrieve the same using cometD.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Copy link

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (in *cometdInput) newPubsubClient(ctx context.Context) (*http.Client, error) {

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Done.

httpcommon.WithKeepaliveSettings{Disable: true},
)
if err != nil {
return nil, err
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use errors.Wrap with context everywhere.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with: error.Errorf with context

@kush-elastic kush-elastic marked this pull request as ready for review December 1, 2021 11:40
@kush-elastic kush-elastic marked this pull request as draft December 1, 2021 11:41
Copy link

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you want to keep it here to accumulate also other PRs. What would be the cut-off point, I mean, when do you plan to merge this PR?

in.workerWg.Add(1)
go func() {
in.log.Info("Pub/Sub input worker has started.")
defer in.log.Info("Pub/Sub input worker has stopped.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking, is the order of these defers important?


// For testing http client
// Start of the test code for authentication and client creation
baseUrl := "<put base URL here>"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to add the baseUrl to the configuration?

httpcommon.WithKeepaliveSettings{Disable: true},
)
if err != nil {
return nil, fmt.Errorf("cometd client: error on newHTTPClient: %v", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a typo in comment? newHTTPClient

})
}

// Stop stops the pubsub input and waits for it to fully stop.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR It should be the other way round: Stop just stops the logic, but doesn't wait, but Wait can call the Stop and wait until all processing is done.

@yug-crest yug-crest changed the title Add authentication mechanism for cometd input for Salesforce connector Add cometd input for Salesforce connector Dec 8, 2021
@yug-crest yug-crest marked this pull request as ready for review December 16, 2021 13:30
@yug-crest yug-crest self-assigned this Dec 24, 2021
Copy link

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, you're mixing the CometD/Bayeux concept and the Salesforce implementation. It doesn't mean that every CometD server requires authorization (any, not just OAuth) or it's Salesforce :) Please take a look at the httpjson implementation to see how it interacts with different auth providers. The other way would be implementing the input as salesforce-cometd, but I see the value in having a generic cometd input, so let's try to make it generic.

Major issues to be adjusted:

  1. The input must support the graceful shutdown.
  2. Input tests are missing. All paths should be covered.
  3. Make the input as stateless as possible. Do not create resources while the input is running if it isn't necessary. It will impact the overall performance.

As this PR include the Filebeat input, please invite also Tiago.

// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire file is missing unit tests. It's a must-have to cover all code paths.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have covered the base unit test case for the creation of the new input using the NewInput function in input_test.go. We're unsure how we can write the unit test cases for other functions and paths the code falls into, so we'll check that out.

select {
case e := <-in.out:
if !e.Successful {
if e.Data.Object == nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment please, why e.Data.Object can be nil?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the appropriate comment, thanks!

if err != nil {
return fmt.Errorf("JSON error: %v", err)
}
err = json.Unmarshal(e.Data.Object, &event)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment please, why json.Marshal, then json.Marshal?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the appropriate comment, thanks!

Comment on lines 209 to 241
type Status struct {
channels []string
clientID string
connected bool
}

type BayeuxHandshake []struct {
ClientID string `json:"clientId"`
Channel string `json:"channel"`
Ext struct {
Replay bool `json:"replay"`
} `json:"ext"`
MinimumVersion string `json:"minimumVersion"`
Successful bool `json:"successful"`
SupportedConnectionTypes []string `json:"supportedConnectionTypes"`
Version string `json:"version"`
}

type Subscription struct {
ClientID string `json:"clientId"`
Channel string `json:"channel"`
Subscription string `json:"subscription"`
Successful bool `json:"successful"`
}

type Credentials struct {
AccessToken string `json:"access_token"`
InstanceURL string `json:"instance_url"`
IssuedAt string `json:"issued_at"`
ID string `json:"id"`
TokenType string `json:"token_type"`
Signature string `json:"signature"`
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these values need to be exposed (Status, BayeuxHandshake, Subscription, Credentials)?

Copy link

@yug-crest yug-crest Dec 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it.

req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken))
// Passing back cookies is required though undocumented in Salesforce API
// We were unable to get process working without passing cookies back to SF server.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can strip this line from here :)

return sub, nil
}

func (b *Bayeux) connect(out chan TriggerEvent, log *logp.Logger) (chan TriggerEvent, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goroutine never finishes. Please adjust the whole input, so it can be gracefully shut down.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are closing the channel before the input shuts down, so I think it is shutting down gracefully.

"client_secret": {o.ClientSecret},
"username": {o.User},
"password": {o.Password}}
res, err := http.PostForm(route, params)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: response

)

var (
status = Status{[]string{}, "", false}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this? why is it global? What will happen if there are multiple configs enabled?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It is used for storing all the channels that are being subscribed and the client IDs
  2. It has been declared as global in order to store multiple channels
  3. The channel names from all the enabled configs would be stored there

return nil, fmt.Errorf("bad Call request: %v", err)
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the implementation, Salesforce oriented or general, this can be optional, right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not quite sure about this one.

// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cometd
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced about keeping all the logic in a single file, I think you have to adjust it based on the SoC principle.

Copy link

@yug-crest yug-crest Dec 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we tried following the gcppubsub and that's why we kept it all in a single file.

@yug-crest
Copy link

First of all, you're mixing the CometD/Bayeux concept and the Salesforce implementation. It doesn't mean that every CometD server requires authorization (any, not just OAuth) or it's Salesforce :) Please take a look at the httpjson implementation to see how it interacts with different auth providers. The other way would be implementing the input as salesforce-cometd, but I see the value in having a generic cometd input, so let's try to make it generic.

Major issues to be adjusted:

  1. The input must support the graceful shutdown.
  2. Input tests are missing. All paths should be covered.
  3. Make the input as stateless as possible. Do not create resources while the input is running if it isn't necessary. It will impact the overall performance.

As this PR include the Filebeat input, please invite also Tiago.

We have tried to make this input generic but at some places like authorization, we had to go Salesforce specific due to the time constraint.
Regarding the graceful shutdown - As far as we tested, it does shut down gracefully.
We are not very sure about points 2 and 3, and how we can incorporate the changes to address them. Can you please elaborate on them?

Copy link

@andresrc andresrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary block

@yug-crest yug-crest requested a review from mtojek January 19, 2022 11:29
Copy link

@andresrc andresrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the block, review from the team pending.

@yug-crest
Copy link

Raised a new PR to elastic:master branch. Please refer the same and ignore this one.
Link to the PR: elastic#30089

@yug-rajani
Copy link

Closing this PR as we have raised a PR in the elastic/beats repo.
Link: elastic#30089

@yug-rajani yug-rajani closed this Mar 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants