-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: regulation api support for Universal Analytics #2632
Conversation
Codecov ReportBase: 45.85% // Head: 46.24% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #2632 +/- ##
==========================================
+ Coverage 45.85% 46.24% +0.38%
==========================================
Files 295 295
Lines 48503 48610 +107
==========================================
+ Hits 22243 22479 +236
+ Misses 24859 24718 -141
- Partials 1401 1413 +12
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
…r only oauth destinations
# Conflicts: # regulation-worker/cmd/main.go # regulation-worker/internal/delete/api/api.go
When the status undefined(model.JobStatusUndefined) is returned, we can understand it is due to one of two things | ||
1. The error that was received was not an error that could trigger Refresh flow | ||
2. The destination itself is not OAuth one in the first place so it doesn't have errorCategory at all | ||
* | ||
*/ | ||
func (api *APIManager) handleRefreshFlow(params *handleRefreshFlowParams) model.JobStatus { | ||
if params.isRefresh { | ||
pkgLogger.Debugf("Refresh flow triggered for %v\n", params.destName) | ||
// Refresh OAuth flow | ||
var refSecret *oauth.AuthResponse | ||
var errCatStatusCode int | ||
refTokenParams := &oauth.RefreshTokenParams{ | ||
Secret: params.secret, | ||
WorkspaceId: params.workspaceId, | ||
AccountId: params.accountId, | ||
DestDefName: params.destName, | ||
EventNamePrefix: "refresh_token", | ||
} | ||
errCatStatusCode, refSecret = api.OAuth.RefreshToken(refTokenParams) | ||
refSec := *refSecret | ||
if strings.TrimSpace(refSec.Err) != "" { | ||
// There is an error occurring | ||
pkgLogger.Warnf("Error: %v, Status: %v", refSec.Err, errCatStatusCode) | ||
} | ||
// Refresh is complete, the job has to be re-tried | ||
return model.JobStatusFailed | ||
} | ||
// Indicates that OAuth refresh flow is not triggered | ||
return model.JobStatusUndefined | ||
} | ||
|
||
func (api *APIManager) getOAuthDestResponseStatus(tokenInfo *OAuthTokenResult, jobResp []JobRespSchema, job model.Job, destDetail model.Destination) model.JobStatus { | ||
status := model.JobStatusUndefined | ||
if tokenInfo.IsOAuthEnabled { | ||
shouldRefresh := shouldRefresh(jobResp) | ||
status = api.handleRefreshFlow(&handleRefreshFlowParams{ | ||
isRefresh: shouldRefresh, | ||
secret: tokenInfo.AccountSecretInfo.Account.Secret, | ||
workspaceId: job.WorkspaceID, | ||
accountId: tokenInfo.DeleteAccountId, | ||
destName: destDetail.Name, | ||
}) | ||
} | ||
return status | ||
} | ||
|
||
type OAuthTokenResult struct { | ||
AccountSecretInfo *oauth.AuthResponse | ||
DeleteAccountId string | ||
IsOAuthEnabled bool | ||
} | ||
|
||
func setOAuthTokenInfo(tokenInfo OAuthTokenResult, req *http.Request) error { | ||
if tokenInfo.IsOAuthEnabled { | ||
// setting oauth related information | ||
payload, marshalErr := json.Marshal(tokenInfo.AccountSecretInfo.Account) | ||
if marshalErr != nil { | ||
marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr) | ||
pkgLogger.Errorf(marshalFailErr) | ||
return errors.New(marshalFailErr) | ||
} | ||
req.Header.Set("X-Rudder-Dest-Info", string(payload)) | ||
} | ||
return nil | ||
} | ||
|
||
func (api *APIManager) getOAuthTokenInfo(workspaceId string, destDetail *model.Destination) (*OAuthTokenResult, error) { | ||
var tokenStatusCode int | ||
var accountSecretInfo *oauth.AuthResponse | ||
// identifier to know if the destination supports OAuth for regulation-api | ||
oAuthParamsResult := oauth.GetOAuthParams(oauth.OAuthParams{ | ||
DestConfig: destDetail.Config, | ||
DestDefConfig: destDetail.DestDefConfig, | ||
IdKey: "rudderDeleteAccountId", | ||
}) | ||
if oAuthParamsResult.Enabled { | ||
// Fetch Token call | ||
// Get Access Token Information to send it as part of the event | ||
tokenStatusCode, accountSecretInfo = api.OAuth.FetchToken(&oauth.RefreshTokenParams{ | ||
AccountId: oAuthParamsResult.AccountId, | ||
WorkspaceId: workspaceId, | ||
DestDefName: destDetail.Name, | ||
EventNamePrefix: "fetch_token", | ||
}) | ||
pkgLogger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destDetail.Name, tokenStatusCode, accountSecretInfo) | ||
if tokenStatusCode != http.StatusOK { | ||
fetchFailStr := fmt.Sprintf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s\n", destDetail.Name, tokenStatusCode, accountSecretInfo.Err) | ||
pkgLogger.Errorf(fetchFailStr) | ||
return nil, errors.New(fetchFailStr) | ||
} | ||
|
||
} else { | ||
pkgLogger.Errorf("[%v] Destination probably doesn't support OAuth or some issue happened while doing OAuth for deletion [Enabled: %v]", destDetail.Name, oAuthParamsResult.Enabled) | ||
} | ||
return &OAuthTokenResult{ | ||
AccountSecretInfo: accountSecretInfo, | ||
DeleteAccountId: oAuthParamsResult.AccountId, | ||
IsOAuthEnabled: oAuthParamsResult.Enabled, | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to need too much worker-specific code for managing an oauth flow. This could be an indication that we are missing the proper abstractions for a truly generic oauth service.
Why do we need all this custom code here and why does it need to differ from the code that router uses? Can we achieve the same result through a generic oath http interceptor, reused with minimal effort wherever we have a need for performing oauth in our requests?
Our goal is to perform an http request using an oauth token & retry this request if it fails due to an expired oauth token after refreshing it and all this transparently from the caller (router/regulation-worker). IMO this is a job for a generic interceptor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to think of the oauth flow as an interceptor. IMHO it seems like the ideal approach for this kind of feature
Making changes to make sure we use this as an interceptor doesn't fit in our current plan. I have outlined a couple of reasons for it
- Timelines. We want this feature to be available for customers
- Long-term vision. As discussed in an earlier call about regulation-worker, the OAuth flow in it's entirety was to move-away from
rudder-server
. Keeping this in mind & the changes that we need to do in order for us to make sure the interceptor approach is being used will be time-taking
But we will make sure to keep this thought in mind while we work on OAuth Flow movement itself. Please let me know if this makes sense
- remove old structure - update mocks
- add a condition to check for accountId information - add a new field to capture flowType for stats & update relevant methods - add functional option pattern to modify options according to the caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming e-2-e testing is done. Code wise looks good to me.
Had been testing for every change all along this PR |
if isOAuthEnabled { | ||
isOAuthTokenExpired := isTokenExpired(jobResp) | ||
if isOAuthTokenExpired { | ||
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens with the original request? we don't retry immediately? we return an error? is it going to be retried?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We return an error. Yes, I think it will be re-tried after sometime. @saurav-malani can you provide insight here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@atzoum we return job status as failed to regulation manager. And, since regulation-manager has a logic to retry failed jobs after sometime, we don't need to worry about it for the time being. Although, retry over here could be improved so that we don't need to involve regulation-manager & avoid trying all users from the batch even in case of partial failure. But, that is out of scope of this PR and can be taken up in a separate one.
ref of regulation-manager logic to retry failed jobs: https://github.com/rudderlabs/data-regulation-service/blob/795a625b23ffa27c801a1ca5c983c4c1ef937288/internal/repo/dataplaneRegulation.go#L25-L28
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if we have only 1 job and its retry backoff becomes higher than the oauth token's validity period? Won't we enter an endless loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes @atzoum, this is a possible known issue with the current implementation. In fact, I did raise this concern is our discussion with the transformation team when you and Leo were also in the call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there something we are afraid of to not retry the request a second time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I think given that in response from transformer we get status corresponding to each user in the batch. It should be quite simple. And, I don't see any problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the userIds for which the failure happens is not being sent in the response. But it can be captured & server needs to make changes to it's response schema to capture that detail & process accordingly which I think is not in scope of this PR(I mean handling partial failures).
As far as retry goes, would 1 retry solve the problem or should we retry multiple times ?!
if isOAuthEnabled { | ||
isOAuthTokenExpired := isTokenExpired(jobResp) | ||
if isOAuthTokenExpired { | ||
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) | ||
if err != nil { | ||
pkgLogger.Error(err) | ||
return model.JobStatusFailed | ||
} | ||
// retry the request | ||
pkgLogger.Debug("Retrying deleteRequest job for the whole batch") | ||
return api.Delete(ctx, job, destination) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if isOAuthEnabled { | |
isOAuthTokenExpired := isTokenExpired(jobResp) | |
if isOAuthTokenExpired { | |
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) | |
if err != nil { | |
pkgLogger.Error(err) | |
return model.JobStatusFailed | |
} | |
// retry the request | |
pkgLogger.Debug("Retrying deleteRequest job for the whole batch") | |
return api.Delete(ctx, job, destination) | |
} | |
} | |
if isOAuthEnabled && isTokenExpired(jobResp) { | |
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) | |
if err != nil { | |
pkgLogger.Error(err) | |
return model.JobStatusFailed | |
} | |
// retry the request | |
pkgLogger.Debug("Retrying deleteRequest job for the whole batch") | |
return api.Delete(ctx, job, destination) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Description
In this PR, we have included the Regulation API support for Universal Analytics(GA) which uses OAuth for processing the user deletion request
This work would unblock any other destinations that might use OAuth for processing delete user requests
Notion Ticket
OAuth in regulation Worker module
Security