Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: regulation api support for Universal Analytics #2632

Merged
merged 40 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dfddb48
feat: regulation api support for Universal Analytics
Oct 31, 2022
f4a67e5
Move info log to debug
Oct 31, 2022
9af81ec
name, error string correction & remove unnecessary condition
Nov 1, 2022
fec7530
separate method for handling refresh flow method implemented
Nov 1, 2022
8b95415
addresses naming logging concerns
Nov 1, 2022
7609b3f
add OAuth tests
Nov 2, 2022
39550a0
add tests for oauth destination in regulation-api
Nov 2, 2022
d77c224
Merge remote-tracking branch 'origin/feat.ga-reg-support' into feat.g…
Nov 2, 2022
1db26f3
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 2, 2022
d3665d5
linter fix
Nov 2, 2022
de288dc
fix tests with dynamic mocking of cp requests, trigger refreshFlow fo…
Nov 3, 2022
6b1f817
pass destination struct to Delete method & associated tests
Nov 3, 2022
616839a
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 8, 2022
8fcec3d
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 9, 2022
1f59039
moving the fetch token to just before sending request to transformer
Nov 10, 2022
8083f7c
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 11, 2022
bcd80e3
remove comments
Nov 14, 2022
21f8113
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 14, 2022
ddefc36
refactor fetch token logic & tests
Nov 14, 2022
8878e35
refactor refresh flow handling
Nov 14, 2022
34564ab
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 14, 2022
a0cb000
change test values
Nov 15, 2022
298050d
combine oauth setup with new oauth instance initialisation
Nov 15, 2022
cb76fce
fix to retry the job when refresh token fails & add more cases around…
Nov 15, 2022
4cac21e
update test-case for fetchToken error
Nov 16, 2022
c3d5634
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 16, 2022
5a20251
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 16, 2022
a25068c
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 17, 2022
b46d62d
regulation worker API oauth code refactor
saurav-malani Nov 18, 2022
51de4a9
move oauth logic to separate folder
Nov 18, 2022
2e677f8
- update error message
Nov 20, 2022
6ba9d19
rename function parameter
Nov 20, 2022
d948275
fix: (CR) rename InitAll to init
Nov 20, 2022
94ea214
add option for oauth init in tests
Nov 21, 2022
45a473a
fix: code-review changes
Nov 22, 2022
5a48d74
feat: single retry capability added after refresh error occurs
Nov 22, 2022
fa062da
fix: update if-else block properly
Nov 23, 2022
464e11f
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
47272cd
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
4d8a6ce
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/destination"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler"
atzoum marked this conversation as resolved.
Show resolved Hide resolved
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")

func main() {
func InitAll() {
atzoum marked this conversation as resolved.
Show resolved Hide resolved
initialize.Init()
backendconfig.Init()
oauth.Init()
}
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved

func main() {
InitAll()

pkgLogger.Info("starting regulation-worker")
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -58,6 +64,9 @@ func Run(ctx context.Context) {
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
}
// setting up oauth
OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig)

svc := service.JobSvc{
API: &client.JobAPI{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
Expand All @@ -75,6 +84,7 @@ func Run(ctx context.Context) {
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
OAuth: OAuth,
}),
}

Expand Down
154 changes: 150 additions & 4 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,40 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
)

var (
pkgLogger = logger.NewLogger().Child("api")
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP"}
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA"}
)

type APIManager struct {
Client *http.Client
DestTransformURL string
OAuth oauth.Authorizer
}

// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus {
func (api *APIManager) Delete(ctx context.Context, job model.Job, destDetail model.Destination) model.JobStatus {
atzoum marked this conversation as resolved.
Show resolved Hide resolved
pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destDetail.Name)
method := http.MethodPost
endpoint := "/deleteUsers"
url := fmt.Sprint(api.DestTransformURL, endpoint)

bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig)
bodySchema := mapJobToPayload(job, strings.ToLower(destDetail.Name), destDetail.Config)
pkgLogger.Debugf("payload: %#v", bodySchema)

reqBody, err := json.Marshal(bodySchema)
if err != nil {
Expand All @@ -49,11 +54,22 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
}
req.Header.Set("Content-Type", "application/json")

tokenInfo, getTokenInfoErr := api.getOAuthTokenInfo(job.WorkspaceID, &destDetail)
if getTokenInfoErr != nil {
// Problem in fetching token
return model.JobStatusFailed
}
setFailErr := setOAuthTokenInfo(*tokenInfo, req)
if setFailErr != nil {
// Marshal failure
return model.JobStatusFailed
}

fileCleaningTime := stats.Default.NewTaggedStat("file_cleaning_time", stats.TimerType, stats.Tags{
"jobId": fmt.Sprintf("%d", job.ID),
"workspaceId": job.WorkspaceID,
"destType": "api",
"destName": strings.ToLower(destName),
"destName": strings.ToLower(destDetail.Name),
})
fileCleaningTime.Start()
defer fileCleaningTime.End()
Expand All @@ -76,6 +92,13 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
return model.JobStatusFailed
}

// Refresh Flow Start
oauthDestRespStatus := api.getOAuthDestResponseStatus(tokenInfo, jobResp, job, destDetail)
if oauthDestRespStatus != model.JobStatusUndefined {
return oauthDestRespStatus
}
// Refresh Flow ends

switch resp.StatusCode {

case http.StatusOK:
Expand Down Expand Up @@ -126,3 +149,126 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter
},
}
}

type handleRefreshFlowParams struct {
secret json.RawMessage
destName string
workspaceId string
accountId string
isRefresh bool
}

func shouldRefresh(jobResponses []JobRespSchema) bool {
var isRefresh bool
for _, jobResponse := range jobResponses {
isRefresh = jobResponse.AuthErrorCategory == oauth.REFRESH_TOKEN
if isRefresh {
break
}
}
return isRefresh
}

/*
*
This method handles the refresh flow for OAuth destinations
When the status undefined(model.JobStatusUndefined) is returned, we can understand it is due to one of two things
1. The error that was received was not an error that could trigger Refresh flow
2. The destination itself is not OAuth one in the first place so it doesn't have errorCategory at all
*
*/
func (api *APIManager) handleRefreshFlow(params *handleRefreshFlowParams) model.JobStatus {
if params.isRefresh {
pkgLogger.Debugf("Refresh flow triggered for %v\n", params.destName)
// Refresh OAuth flow
var refSecret *oauth.AuthResponse
var errCatStatusCode int
refTokenParams := &oauth.RefreshTokenParams{
Secret: params.secret,
WorkspaceId: params.workspaceId,
AccountId: params.accountId,
DestDefName: params.destName,
EventNamePrefix: "refresh_token",
}
errCatStatusCode, refSecret = api.OAuth.RefreshToken(refTokenParams)
refSec := *refSecret
if strings.TrimSpace(refSec.Err) != "" {
// There is an error occurring
pkgLogger.Warnf("Error: %v, Status: %v", refSec.Err, errCatStatusCode)
}
// Refresh is complete, the job has to be re-tried
return model.JobStatusFailed
}
// Indicates that OAuth refresh flow is not triggered
return model.JobStatusUndefined
}

func (api *APIManager) getOAuthDestResponseStatus(tokenInfo *OAuthTokenResult, jobResp []JobRespSchema, job model.Job, destDetail model.Destination) model.JobStatus {
status := model.JobStatusUndefined
if tokenInfo.IsOAuthEnabled {
shouldRefresh := shouldRefresh(jobResp)
status = api.handleRefreshFlow(&handleRefreshFlowParams{
isRefresh: shouldRefresh,
secret: tokenInfo.AccountSecretInfo.Account.Secret,
workspaceId: job.WorkspaceID,
accountId: tokenInfo.DeleteAccountId,
destName: destDetail.Name,
})
}
return status
}

type OAuthTokenResult struct {
AccountSecretInfo *oauth.AuthResponse
DeleteAccountId string
IsOAuthEnabled bool
}

func setOAuthTokenInfo(tokenInfo OAuthTokenResult, req *http.Request) error {
if tokenInfo.IsOAuthEnabled {
// setting oauth related information
payload, marshalErr := json.Marshal(tokenInfo.AccountSecretInfo.Account)
if marshalErr != nil {
marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr)
pkgLogger.Errorf(marshalFailErr)
return errors.New(marshalFailErr)
}
req.Header.Set("X-Rudder-Dest-Info", string(payload))
}
return nil
}

func (api *APIManager) getOAuthTokenInfo(workspaceId string, destDetail *model.Destination) (*OAuthTokenResult, error) {
var tokenStatusCode int
var accountSecretInfo *oauth.AuthResponse
// identifier to know if the destination supports OAuth for regulation-api
oAuthParamsResult := oauth.GetOAuthParams(oauth.OAuthParams{
DestConfig: destDetail.Config,
DestDefConfig: destDetail.DestDefConfig,
IdKey: "rudderDeleteAccountId",
})
if oAuthParamsResult.Enabled {
// Fetch Token call
// Get Access Token Information to send it as part of the event
tokenStatusCode, accountSecretInfo = api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: oAuthParamsResult.AccountId,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
EventNamePrefix: "fetch_token",
})
pkgLogger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destDetail.Name, tokenStatusCode, accountSecretInfo)
if tokenStatusCode != http.StatusOK {
fetchFailStr := fmt.Sprintf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s\n", destDetail.Name, tokenStatusCode, accountSecretInfo.Err)
pkgLogger.Errorf(fetchFailStr)
return nil, errors.New(fetchFailStr)
}

} else {
pkgLogger.Errorf("[%v] Destination probably doesn't support OAuth or some issue happened while doing OAuth for deletion [Enabled: %v]", destDetail.Name, oAuthParamsResult.Enabled)
}
return &OAuthTokenResult{
AccountSecretInfo: accountSecretInfo,
DeleteAccountId: oAuthParamsResult.AccountId,
IsOAuthEnabled: oAuthParamsResult.Enabled,
}, nil
}
Copy link
Contributor

@atzoum atzoum Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to need too much worker-specific code for managing an oauth flow. This could be an indication that we are missing the proper abstractions for a truly generic oauth service.

Why do we need all this custom code here and why does it need to differ from the code that router uses? Can we achieve the same result through a generic oath http interceptor, reused with minimal effort wherever we have a need for performing oauth in our requests?

Our goal is to perform an http request using an oauth token & retry this request if it fails due to an expired oauth token after refreshing it and all this transparently from the caller (router/regulation-worker). IMO this is a job for a generic interceptor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to think of the oauth flow as an interceptor. IMHO it seems like the ideal approach for this kind of feature

Making changes to make sure we use this as an interceptor doesn't fit in our current plan. I have outlined a couple of reasons for it

  1. Timelines. We want this feature to be available for customers
  2. Long-term vision. As discussed in an earlier call about regulation-worker, the OAuth flow in it's entirety was to move-away from rudder-server. Keeping this in mind & the changes that we need to do in order for us to make sure the interceptor approach is being used will be time-taking

But we will make sure to keep this thought in mind while we work on OAuth Flow movement itself. Please let me know if this makes sense

Loading