Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: regulation api support for Universal Analytics #2632

Merged
merged 40 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dfddb48
feat: regulation api support for Universal Analytics
Oct 31, 2022
f4a67e5
Move info log to debug
Oct 31, 2022
9af81ec
name, error string correction & remove unnecessary condition
Nov 1, 2022
fec7530
separate method for handling refresh flow method implemented
Nov 1, 2022
8b95415
addresses naming logging concerns
Nov 1, 2022
7609b3f
add OAuth tests
Nov 2, 2022
39550a0
add tests for oauth destination in regulation-api
Nov 2, 2022
d77c224
Merge remote-tracking branch 'origin/feat.ga-reg-support' into feat.g…
Nov 2, 2022
1db26f3
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 2, 2022
d3665d5
linter fix
Nov 2, 2022
de288dc
fix tests with dynamic mocking of cp requests, trigger refreshFlow fo…
Nov 3, 2022
6b1f817
pass destination struct to Delete method & associated tests
Nov 3, 2022
616839a
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 8, 2022
8fcec3d
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 9, 2022
1f59039
moving the fetch token to just before sending request to transformer
Nov 10, 2022
8083f7c
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 11, 2022
bcd80e3
remove comments
Nov 14, 2022
21f8113
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 14, 2022
ddefc36
refactor fetch token logic & tests
Nov 14, 2022
8878e35
refactor refresh flow handling
Nov 14, 2022
34564ab
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 14, 2022
a0cb000
change test values
Nov 15, 2022
298050d
combine oauth setup with new oauth instance initialisation
Nov 15, 2022
cb76fce
fix to retry the job when refresh token fails & add more cases around…
Nov 15, 2022
4cac21e
update test-case for fetchToken error
Nov 16, 2022
c3d5634
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 16, 2022
5a20251
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 16, 2022
a25068c
Merge remote-tracking branch 'origin/master' into feat.ga-reg-support
Nov 17, 2022
b46d62d
regulation worker API oauth code refactor
saurav-malani Nov 18, 2022
51de4a9
move oauth logic to separate folder
Nov 18, 2022
2e677f8
- update error message
Nov 20, 2022
6ba9d19
rename function parameter
Nov 20, 2022
d948275
fix: (CR) rename InitAll to init
Nov 20, 2022
94ea214
add option for oauth init in tests
Nov 21, 2022
45a473a
fix: code-review changes
Nov 22, 2022
5a48d74
feat: single retry capability added after refresh error occurs
Nov 22, 2022
fa062da
fix: update if-else block properly
Nov 23, 2022
464e11f
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
47272cd
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
4d8a6ce
Merge remote-tracking branch 'origin' into feat.ga-reg-support
Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/destination"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler"
atzoum marked this conversation as resolved.
Show resolved Hide resolved
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")

func main() {
func InitAll() {
atzoum marked this conversation as resolved.
Show resolved Hide resolved
initialize.Init()
backendconfig.Init()
oauth.Init()
}
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved

func main() {
InitAll()

pkgLogger.Info("starting regulation-worker")
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -57,6 +63,9 @@ func Run(ctx context.Context) {
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
}
// setting up oauth
OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig)
OAuth.Setup()
saurav-malani marked this conversation as resolved.
Show resolved Hide resolved

svc := service.JobSvc{
API: &client.JobAPI{
Expand All @@ -74,11 +83,14 @@ func Run(ctx context.Context) {
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
// OAuthResponseHandler instance
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
OAuth: OAuth,
}),
}

pkgLogger.Infof("calling looper with service: %v", svc)
l := withLoop(svc)

err = l.Loop(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
pkgLogger.Errorf("error: %v", err)
Expand Down
72 changes: 71 additions & 1 deletion regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ import (
"strings"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
)

var (
pkgLogger = logger.NewLogger().Child("api")
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP"}
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA"}
)

type APIManager struct {
Client *http.Client
DestTransformURL string
OAuth oauth.Authorizer
}

// prepares payload based on (job,destDetail) & make an API call to transformer.
Expand All @@ -39,6 +41,31 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map

bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig)
pkgLogger.Debugf("payload: %#v", bodySchema)
pkgLogger.Infof("Regulation Delete API called:\n")

var tokenStatusCode int
var accountSecretInfo *oauth.AuthResponse
// identifier to know if the destination supports OAuth
// TODO: "rudderAccountId" has to change to "rudderUserDeleteAccountId"
rudderUserDeleteAccountId, delAccountIdExists := destConfig["rudderAccountId"]
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
// TODO: This needs to be changed
isOauthEnabled := delAccountIdExists || destName == "GA"
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
if isOauthEnabled {
// Fetch Token call
// Get Access Token Information to send it as part of the event
tokenStatusCode, accountSecretInfo = api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: rudderUserDeleteAccountId.(string),
WorkspaceId: job.WorkspaceID,
DestDefName: destName,
EventNamePrefix: "fetch_token",
})
pkgLogger.Infof(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destName, tokenStatusCode, accountSecretInfo)
if tokenStatusCode != http.StatusOK {
pkgLogger.Errorf(`[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s\n`, destName, tokenStatusCode, accountSecretInfo.Err)
}
} else {
pkgLogger.Errorf("[%v] Destination probably doesn't support OAuth or some issue happened while doing OAuth for deletion", destName)
koladilip marked this conversation as resolved.
Show resolved Hide resolved
}
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved

reqBody, err := json.Marshal(bodySchema)
if err != nil {
Expand All @@ -53,6 +80,16 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
}
req.Header.Set("Content-Type", "application/json")

// setting oauth related information
if isOauthEnabled {
payload, unmarshalErr := json.Marshal(accountSecretInfo.Account)
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
if unmarshalErr != nil {
pkgLogger.Errorf("error while unmarshalling account secret information: %v", unmarshalErr)
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
return model.JobStatusFailed
}
req.Header.Set("X-Rudder-Dest-Info", string(payload))
}

fileCleaningTime := stats.Default.NewTaggedStat("file_cleaning_time", stats.TimerType, stats.Tags{
"jobId": fmt.Sprintf("%d", job.ID),
"workspaceId": job.WorkspaceID,
Expand Down Expand Up @@ -82,6 +119,39 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
pkgLogger.Errorf("error while decoding response body: %v", err)
return model.JobStatusFailed
}
// Refresh Flow Start
var isRefresh bool
for _, jobResponse := range jobResp {
koladilip marked this conversation as resolved.
Show resolved Hide resolved
isRefresh = strings.TrimSpace(jobResponse.AuthErrorCategory) != "" && jobResponse.AuthErrorCategory == oauth.REFRESH_TOKEN
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
if isRefresh {
break
}
}
if isRefresh {
pkgLogger.Infof("Refresh flow triggered for %v\n", destName)
// Refresh OAuth flow
var refSecret *oauth.AuthResponse
var errCatStatusCode int
refTokenParams := &oauth.RefreshTokenParams{
Secret: accountSecretInfo.Account.Secret,
WorkspaceId: job.WorkspaceID,
AccountId: rudderUserDeleteAccountId.(string),
DestDefName: destName,
EventNamePrefix: "refresh_token",
}
errCatStatusCode, refSecret = api.OAuth.RefreshToken(refTokenParams)
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Does it make sense to have disable destination here ?
refSec := *refSecret
if strings.TrimSpace(refSec.Err) != "" {
// There is an error occurring
pkgLogger.Warnf("Error: %v, Status: %v", refSec.Err, errCatStatusCode)
return model.JobStatusAborted
}
// Refresh is complete, the job has to be re-tried
return model.JobStatusFailed
}
// Refresh Flow ends

switch resp.StatusCode {

case http.StatusOK:
Expand Down
5 changes: 3 additions & 2 deletions regulation-worker/internal/delete/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type apiDeletionPayloadSchema struct {
}

type JobRespSchema struct {
Status string `json:"status"`
Error string `json:"error"`
Status string `json:"status"`
Error string `json:"error"`
AuthErrorCategory string `json:"authErrorCategory"`
}
2 changes: 2 additions & 0 deletions regulation-worker/internal/destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (d *DestMiddleware) GetDestDetails(ctx context.Context, destID string) (mod
destDetail.Config = dest.Config
destDetail.DestinationID = dest.ID
destDetail.Name = dest.DestinationDefinition.Name
// Destination Definition Config would most likely be needed
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
destDetail.DestDefConfig = dest.DestinationDefinition.Config
pkgLogger.Debugf("obtained destination detail: %v", destDetail)
return destDetail, nil
}
Expand Down
1 change: 1 addition & 0 deletions regulation-worker/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type User struct {

type Destination struct {
Config map[string]interface{}
DestDefConfig map[string]interface{}
saurav-malani marked this conversation as resolved.
Show resolved Hide resolved
DestinationID string
Name string
}
Expand Down