Skip to content

Commit

Permalink
fix: regulation-worker changes for oauth destinations (#2730)
Browse files Browse the repository at this point in the history
* fix: oauth retry restricted to only one re-try attempt in case of refresh failure

* - add control-plane request timeout stat
- update error message to include actual response
- update naming
- making maxRetryAttempts as configurable
- include jobId in logs

* rename the attempts argument

* fix: info logs to debug deployment issue in dev

* Add logs

* revert logs

* revert to Debug logs

Co-authored-by: Sai Sankeerth <[email protected]>
  • Loading branch information
sanpj2292 and Sai Sankeerth authored Nov 28, 2022
1 parent 9f006e5 commit 0ed5a82
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
7 changes: 4 additions & 3 deletions regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ func Run(ctx context.Context) {
FilesLimit: config.GetInt("REGULATION_WORKER_FILES_LIMIT", 1000),
},
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
OAuth: OAuth,
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
OAuth: OAuth,
MaxOAuthRefreshRetryAttempts: config.GetInt("RegulationWorker.oauth.maxRefreshRetryAttempts", 1),
}),
}

Expand Down
25 changes: 15 additions & 10 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ var (
)

type APIManager struct {
Client *http.Client
DestTransformURL string
OAuth oauth.Authorizer
Client *http.Client
DestTransformURL string
OAuth oauth.Authorizer
MaxOAuthRefreshRetryAttempts int
}

type oauthDetail struct {
Expand All @@ -41,9 +42,7 @@ func (*APIManager) GetSupportedDestinations() []string {
return supportedDestinations
}

// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus {
func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, destination model.Destination, currentOauthRetryAttempt int) model.JobStatus {
pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destination.Name)
method := http.MethodPost
endpoint := "/deleteUsers"
Expand Down Expand Up @@ -106,22 +105,28 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destination mo
return model.JobStatusFailed
}
jobStatus := getJobStatus(resp.StatusCode, jobResp)
pkgLogger.Debugf("[%v] JobStatus for %v: %v", destination.Name, destination.DestinationID, jobStatus)
pkgLogger.Debugf("[%v] Job: %v, JobStatus: %v", destination.Name, job.ID, jobStatus)

if isOAuthEnabled && isTokenExpired(jobResp) {
if isOAuthEnabled && isTokenExpired(jobResp) && currentOauthRetryAttempt < api.MaxOAuthRefreshRetryAttempts {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
// retry the request
pkgLogger.Debug("Retrying deleteRequest job for the whole batch")
return api.Delete(ctx, job, destination)
pkgLogger.Infof("[%v] Retrying deleteRequest job(id: %v) for the whole batch, RetryAttempt: %v", destination.Name, job.ID, currentOauthRetryAttempt+1)
return api.deleteWithRetry(ctx, job, destination, currentOauthRetryAttempt+1)
}

return jobStatus
}

// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus {
return api.deleteWithRetry(ctx, job, destination, 0)
}

func getJobStatus(statusCode int, jobResp []JobRespSchema) model.JobStatus {
switch statusCode {

Expand Down
21 changes: 14 additions & 7 deletions services/oauth/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -318,7 +319,6 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
authStats.statName = fmt.Sprintf(`%v_request_latency`, refTokenParams.EventNamePrefix)
authStats.SendTimerStats(cpiCallStartTime)

authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d\n", loggerNm, refTokenParams.WorkerId)
authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d with statusCode: %d\n", loggerNm, refTokenParams.WorkerId, statusCode)

// Empty Refresh token response
Expand Down Expand Up @@ -369,7 +369,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
func getRefreshTokenErrResp(response string, accountSecret *AccountSecret) (message string) {
if err := json.Unmarshal([]byte(response), &accountSecret); err != nil {
// Some problem with AccountSecret unmarshalling
message = err.Error()
message = fmt.Sprintf("Unmarshal of response unsuccessful: %v", response)
} else if gjson.Get(response, "body.code").String() == INVALID_REFRESH_TOKEN_GRANT {
// User (or) AccessToken (or) RefreshToken has been revoked
message = INVALID_REFRESH_TOKEN_GRANT
Expand Down Expand Up @@ -515,6 +515,14 @@ func processResponse(resp *http.Response) (statusCode int, respBody string) {
}

func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) (int, string) {
cpStatTags := stats.Tags{
"url": cpReq.Url,
"requestType": cpReq.RequestType,
"destType": cpReq.destName,
"method": cpReq.Method,
"flowType": string(authErrHandler.rudderFlowType),
}

var reqBody *bytes.Buffer
var req *http.Request
var err error
Expand All @@ -539,15 +547,14 @@ func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT)

cpApiDoTimeStart := time.Now()
res, doErr := authErrHandler.client.Do(req)
stats.Default.NewTaggedStat("cp_request_latency", stats.TimerType, stats.Tags{
"url": cpReq.Url,
"destination": cpReq.destName,
"requestType": cpReq.RequestType,
}).SendTiming(time.Since(cpApiDoTimeStart))
stats.Default.NewTaggedStat("cp_request_latency", stats.TimerType, cpStatTags).SendTiming(time.Since(cpApiDoTimeStart))
authErrHandler.logger.Debugf("[%s request] :: destination request sent\n", loggerNm)
if doErr != nil {
// Abort on receiving an error
authErrHandler.logger.Errorf("[%s request] :: destination request failed: %+v\n", loggerNm, doErr)
if os.IsTimeout(doErr) {
stats.Default.NewTaggedStat("cp_request_timeout", stats.CountType, cpStatTags)
}
return http.StatusBadRequest, doErr.Error()
}
if res.Body != nil {
Expand Down

0 comments on commit 0ed5a82

Please sign in to comment.