Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: regulation api support for Universal Analytics #2632

Merged
merged 40 commits into from
Nov 24, 2022

Conversation

sanpj2292
Copy link
Contributor

Description

In this PR, we have included the Regulation API support for Universal Analytics(GA) which uses OAuth for processing the user deletion request
This work would unblock any other destinations that might use OAuth for processing delete user requests

Notion Ticket

OAuth in regulation Worker module

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@codecov
Copy link

codecov bot commented Oct 31, 2022

Codecov Report

Base: 45.85% // Head: 46.24% // Increases project coverage by +0.38% 🎉

Coverage data is based on head (4d8a6ce) compared to base (ef64bba).
Patch coverage: 80.64% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2632      +/-   ##
==========================================
+ Coverage   45.85%   46.24%   +0.38%     
==========================================
  Files         295      295              
  Lines       48503    48610     +107     
==========================================
+ Hits        22243    22479     +236     
+ Misses      24859    24718     -141     
- Partials     1401     1413      +12     
Impacted Files Coverage Δ
router/router.go 73.83% <0.00%> (+0.03%) ⬆️
router/utils/utils.go 68.57% <ø> (+10.08%) ⬆️
runner/runner.go 75.30% <ø> (ø)
services/oauth/oauth.go 62.99% <82.00%> (ø)
regulation-worker/cmd/main.go 67.74% <83.33%> (+7.39%) ⬆️
regulation-worker/internal/delete/api/api.go 80.92% <85.18%> (+2.99%) ⬆️
regulation-worker/internal/delete/batch/batch.go 50.70% <100.00%> (+0.42%) ⬆️
regulation-worker/internal/delete/delete.go 100.00% <100.00%> (ø)
...gulation-worker/internal/delete/kvstore/kvstore.go 80.95% <100.00%> (+3.17%) ⬆️
...ulation-worker/internal/destination/destination.go 68.96% <100.00%> (+1.10%) ⬆️
... and 11 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
Copy link
Contributor

@koladilip koladilip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/kvstore/kvstore.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
@sanpj2292 sanpj2292 marked this pull request as ready for review November 16, 2022 12:57
regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/cmd/main.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
router/oauthResponseHandler/oauthResponseHandler.go Outdated Show resolved Hide resolved
router/utils/utils.go Outdated Show resolved Hide resolved
router/utils/utils.go Outdated Show resolved Hide resolved
Comment on lines 153 to 274
When the status undefined(model.JobStatusUndefined) is returned, we can understand it is due to one of two things
1. The error that was received was not an error that could trigger Refresh flow
2. The destination itself is not OAuth one in the first place so it doesn't have errorCategory at all
*
*/
func (api *APIManager) handleRefreshFlow(params *handleRefreshFlowParams) model.JobStatus {
if params.isRefresh {
pkgLogger.Debugf("Refresh flow triggered for %v\n", params.destName)
// Refresh OAuth flow
var refSecret *oauth.AuthResponse
var errCatStatusCode int
refTokenParams := &oauth.RefreshTokenParams{
Secret: params.secret,
WorkspaceId: params.workspaceId,
AccountId: params.accountId,
DestDefName: params.destName,
EventNamePrefix: "refresh_token",
}
errCatStatusCode, refSecret = api.OAuth.RefreshToken(refTokenParams)
refSec := *refSecret
if strings.TrimSpace(refSec.Err) != "" {
// There is an error occurring
pkgLogger.Warnf("Error: %v, Status: %v", refSec.Err, errCatStatusCode)
}
// Refresh is complete, the job has to be re-tried
return model.JobStatusFailed
}
// Indicates that OAuth refresh flow is not triggered
return model.JobStatusUndefined
}

func (api *APIManager) getOAuthDestResponseStatus(tokenInfo *OAuthTokenResult, jobResp []JobRespSchema, job model.Job, destDetail model.Destination) model.JobStatus {
status := model.JobStatusUndefined
if tokenInfo.IsOAuthEnabled {
shouldRefresh := shouldRefresh(jobResp)
status = api.handleRefreshFlow(&handleRefreshFlowParams{
isRefresh: shouldRefresh,
secret: tokenInfo.AccountSecretInfo.Account.Secret,
workspaceId: job.WorkspaceID,
accountId: tokenInfo.DeleteAccountId,
destName: destDetail.Name,
})
}
return status
}

type OAuthTokenResult struct {
AccountSecretInfo *oauth.AuthResponse
DeleteAccountId string
IsOAuthEnabled bool
}

func setOAuthTokenInfo(tokenInfo OAuthTokenResult, req *http.Request) error {
if tokenInfo.IsOAuthEnabled {
// setting oauth related information
payload, marshalErr := json.Marshal(tokenInfo.AccountSecretInfo.Account)
if marshalErr != nil {
marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr)
pkgLogger.Errorf(marshalFailErr)
return errors.New(marshalFailErr)
}
req.Header.Set("X-Rudder-Dest-Info", string(payload))
}
return nil
}

func (api *APIManager) getOAuthTokenInfo(workspaceId string, destDetail *model.Destination) (*OAuthTokenResult, error) {
var tokenStatusCode int
var accountSecretInfo *oauth.AuthResponse
// identifier to know if the destination supports OAuth for regulation-api
oAuthParamsResult := oauth.GetOAuthParams(oauth.OAuthParams{
DestConfig: destDetail.Config,
DestDefConfig: destDetail.DestDefConfig,
IdKey: "rudderDeleteAccountId",
})
if oAuthParamsResult.Enabled {
// Fetch Token call
// Get Access Token Information to send it as part of the event
tokenStatusCode, accountSecretInfo = api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: oAuthParamsResult.AccountId,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
EventNamePrefix: "fetch_token",
})
pkgLogger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destDetail.Name, tokenStatusCode, accountSecretInfo)
if tokenStatusCode != http.StatusOK {
fetchFailStr := fmt.Sprintf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s\n", destDetail.Name, tokenStatusCode, accountSecretInfo.Err)
pkgLogger.Errorf(fetchFailStr)
return nil, errors.New(fetchFailStr)
}

} else {
pkgLogger.Errorf("[%v] Destination probably doesn't support OAuth or some issue happened while doing OAuth for deletion [Enabled: %v]", destDetail.Name, oAuthParamsResult.Enabled)
}
return &OAuthTokenResult{
AccountSecretInfo: accountSecretInfo,
DeleteAccountId: oAuthParamsResult.AccountId,
IsOAuthEnabled: oAuthParamsResult.Enabled,
}, nil
}
Copy link
Contributor

@atzoum atzoum Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to need too much worker-specific code for managing an oauth flow. This could be an indication that we are missing the proper abstractions for a truly generic oauth service.

Why do we need all this custom code here and why does it need to differ from the code that router uses? Can we achieve the same result through a generic oath http interceptor, reused with minimal effort wherever we have a need for performing oauth in our requests?

Our goal is to perform an http request using an oauth token & retry this request if it fails due to an expired oauth token after refreshing it and all this transparently from the caller (router/regulation-worker). IMO this is a job for a generic interceptor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to think of the oauth flow as an interceptor. IMHO it seems like the ideal approach for this kind of feature

Making changes to make sure we use this as an interceptor doesn't fit in our current plan. I have outlined a couple of reasons for it

  1. Timelines. We want this feature to be available for customers
  2. Long-term vision. As discussed in an earlier call about regulation-worker, the OAuth flow in it's entirety was to move-away from rudder-server. Keeping this in mind & the changes that we need to do in order for us to make sure the interceptor approach is being used will be time-taking

But we will make sure to keep this thought in mind while we work on OAuth Flow movement itself. Please let me know if this makes sense

saurav-malani and others added 6 commits November 18, 2022 16:39
- remove old structure
- update mocks
- add a condition to check for accountId information
- add a new field to capture flowType for stats & update relevant methods
- add functional option pattern to modify options according to the caller
@saurav-malani saurav-malani self-requested a review November 21, 2022 06:40
Copy link
Contributor

@saurav-malani saurav-malani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming e-2-e testing is done. Code wise looks good to me.

@sanpj2292
Copy link
Contributor Author

Assuming e-2-e testing is done.

Had been testing for every change all along this PR

regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
if isOAuthEnabled {
isOAuthTokenExpired := isTokenExpired(jobResp)
if isOAuthTokenExpired {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens with the original request? we don't retry immediately? we return an error? is it going to be retried?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return an error. Yes, I think it will be re-tried after sometime. @saurav-malani can you provide insight here ?

Copy link
Contributor

@saurav-malani saurav-malani Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atzoum we return job status as failed to regulation manager. And, since regulation-manager has a logic to retry failed jobs after sometime, we don't need to worry about it for the time being. Although, retry over here could be improved so that we don't need to involve regulation-manager & avoid trying all users from the batch even in case of partial failure. But, that is out of scope of this PR and can be taken up in a separate one.

ref of regulation-manager logic to retry failed jobs: https://github.com/rudderlabs/data-regulation-service/blob/795a625b23ffa27c801a1ca5c983c4c1ef937288/internal/repo/dataplaneRegulation.go#L25-L28

Copy link
Contributor

@atzoum atzoum Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we have only 1 job and its retry backoff becomes higher than the oauth token's validity period? Won't we enter an endless loop?

Copy link
Contributor

@saurav-malani saurav-malani Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes @atzoum, this is a possible known issue with the current implementation. In fact, I did raise this concern is our discussion with the transformation team when you and Leo were also in the call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there something we are afraid of to not retry the request a second time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I think given that in response from transformer we get status corresponding to each user in the batch. It should be quite simple. And, I don't see any problem.

Copy link
Contributor Author

@sanpj2292 sanpj2292 Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the userIds for which the failure happens is not being sent in the response. But it can be captured & server needs to make changes to it's response schema to capture that detail & process accordingly which I think is not in scope of this PR(I mean handling partial failures).

As far as retry goes, would 1 retry solve the problem or should we retry multiple times ?!

regulation-worker/internal/delete/api/api.go Outdated Show resolved Hide resolved
services/oauth/oauth.go Outdated Show resolved Hide resolved
services/oauth/oauth.go Outdated Show resolved Hide resolved
services/oauth/oauth.go Outdated Show resolved Hide resolved
Comment on lines 111 to 123
if isOAuthEnabled {
isOAuthTokenExpired := isTokenExpired(jobResp)
if isOAuthTokenExpired {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
// retry the request
pkgLogger.Debug("Retrying deleteRequest job for the whole batch")
return api.Delete(ctx, job, destination)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isOAuthEnabled {
isOAuthTokenExpired := isTokenExpired(jobResp)
if isOAuthTokenExpired {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
// retry the request
pkgLogger.Debug("Retrying deleteRequest job for the whole batch")
return api.Delete(ctx, job, destination)
}
}
if isOAuthEnabled && isTokenExpired(jobResp) {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
// retry the request
pkgLogger.Debug("Retrying deleteRequest job for the whole batch")
return api.Delete(ctx, job, destination)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants