diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index f94a4dd4b0..c78f45afc9 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -73,6 +73,7 @@ func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, desti } err = setOAuthHeader(oAuthDetail.secretToken, req) if err != nil { + pkgLogger.Errorf("[%v] error occurred while setting oauth header for workspace: %v, destination: %v", destination.Name, job.WorkspaceID, destination.DestinationID) pkgLogger.Error(err) return model.JobStatusFailed } @@ -198,7 +199,7 @@ func setOAuthHeader(secretToken *oauth.AuthResponse, req *http.Request) error { func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId string) (oauthDetail, error) { id := oauth.GetAccountId(destDetail.Config, oauth.DeleteAccountIdKey) if strings.TrimSpace(id) == "" { - return oauthDetail{}, fmt.Errorf("%v is not present for %v", oauth.DeleteAccountIdKey, destDetail.Name) + return oauthDetail{}, fmt.Errorf("[%v] Delete account ID key (%v) is not present for destination: %v", destDetail.Name, oauth.DeleteAccountIdKey, destDetail.DestinationID) } tokenStatusCode, secretToken := api.OAuth.FetchToken(&oauth.RefreshTokenParams{ AccountId: id, @@ -223,9 +224,13 @@ func (api *APIManager) refreshOAuthToken(destName, workspaceId string, oAuthDeta DestDefName: destName, EventNamePrefix: "refresh_token", } - statusCode, _ := api.OAuth.RefreshToken(refTokenParams) + statusCode, refreshResponse := api.OAuth.RefreshToken(refTokenParams) if statusCode != http.StatusOK { - return fmt.Errorf("failed to refresh token for destination: %v", destName) + var refreshRespErr string + if refreshResponse != nil { + refreshRespErr = refreshResponse.Err + } + return fmt.Errorf("[%v] Failed to refresh token for destination in workspace(%v) & account(%v) with %v", destName, workspaceId, oAuthDetail.id, refreshRespErr) } return nil } diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index 26b719c9b4..3c3af8cdfe 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -208,18 +208,17 @@ func TestOAuth(t *testing.T) { mockBackendConfig.EXPECT().AccessToken().AnyTimes() tests := []struct { - name string - job model.Job - dest model.Destination - destConfig map[string]interface{} - destName string - respCode int - respBodyStatus model.JobStatus - authErrorCategory string - respBodyErr error - cpResponses []cpResponseParams - expectedDeleteStatus model.JobStatus - expectedPayload string + name string + job model.Job + dest model.Destination + destConfig map[string]interface{} + destName string + respBodyErr error + cpResponses []cpResponseParams + deleteResponses []deleteResponseParams + oauthHttpClientTimeout time.Duration + expectedDeleteStatus model.JobStatus + expectedPayload string }{ { name: "test with a valid token and successful response", @@ -252,6 +251,7 @@ func TestOAuth(t *testing.T) { }, }, dest: model.Destination{ + DestinationID: "1234", Config: map[string]interface{}{ "rudderDeleteAccountId": "xyz", }, @@ -262,19 +262,23 @@ func TestOAuth(t *testing.T) { }, }, }, - respCode: 200, + deleteResponses: []deleteResponseParams{ + { + status: 200, + jobResponse: `[{"status":"successful"}]`, + }, + }, cpResponses: []cpResponseParams{ { code: 200, response: `{"secret": {"access_token": "valid_access_token","refresh_token":"valid_refresh_token"}}`, }, }, - respBodyStatus: "complete", expectedDeleteStatus: model.JobStatusComplete, expectedPayload: `[{"jobId":"1","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293437@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad853601942465969121327991@gmail.com","userId":"Mercie8221821544021583104106123"},{"phone":"8782905113","userId":"Claiborn443446989226249191822329"}]}]`, }, { - name: "test with an expired token and validate if token is getting changed", + name: "when 1st time fails with expired token after refresh, immediate retry of job should pass the job", job: model.Job{ ID: 2, WorkspaceID: "1001", @@ -298,6 +302,7 @@ func TestOAuth(t *testing.T) { }, }, dest: model.Destination{ + DestinationID: "1234", Config: map[string]interface{}{ "rudderDeleteAccountId": "xyz", }, @@ -308,9 +313,16 @@ func TestOAuth(t *testing.T) { }, }, }, - respCode: 500, - respBodyStatus: "failed", - authErrorCategory: oauth.REFRESH_TOKEN, + deleteResponses: []deleteResponseParams{ + { + status: 500, + jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN", "error": "[GA] invalid credentials"}]`, + }, + { + status: 200, + jobResponse: `[{"status":"successful"}]`, + }, + }, cpResponses: []cpResponseParams{ { @@ -327,9 +339,9 @@ func TestOAuth(t *testing.T) { expectedPayload: `[{"jobId":"2","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293438@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad8536019424659691213279982@gmail.com","userId":"Mercie8221821544021583104106123"}]}]`, }, { - name: "test with an expired token, when refresh throws 500 validate if job failed", + name: "test when fetch token fails(with 500) to respond properly fail the job", job: model.Job{ - ID: 4, + ID: 3, WorkspaceID: "1001", DestinationID: "1234", Status: model.JobStatusPending, @@ -338,19 +350,20 @@ func TestOAuth(t *testing.T) { ID: "Jermaine1473336609491897794707338", Attributes: map[string]string{ "phone": "6463633841", - "email": "dorowane8n285680461479465450293439@gmail.com", + "email": "dorowane8n285680461479465450293448@gmail.com", "randomKey": "randomValue", }, }, { ID: "Mercie8221821544021583104106123", Attributes: map[string]string{ - "email": "dshirilad8536019424659691213279985@gmail.com", + "email": "dshirilad8536019424659691213279983@gmail.com", }, }, }, }, dest: model.Destination{ + DestinationID: "1234", Config: map[string]interface{}{ "rudderDeleteAccountId": "xyz", }, @@ -361,26 +374,20 @@ func TestOAuth(t *testing.T) { }, }, }, - respCode: 500, - respBodyStatus: "failed", - authErrorCategory: oauth.REFRESH_TOKEN, cpResponses: []cpResponseParams{ - { - code: 200, - response: `{"secret": {"access_token": "expired_access_token","refresh_token":"valid_refresh_token"}}`, - }, { code: 500, response: `Internal Server Error`, }, }, + deleteResponses: []deleteResponseParams{{}}, expectedDeleteStatus: model.JobStatusFailed, - expectedPayload: `[{"jobId":"4","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293439@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad8536019424659691213279985@gmail.com","userId":"Mercie8221821544021583104106123"}]}]`, + expectedPayload: "", // since request has not gone to transformer at all! }, { - name: "test when fetch token fails(with 500) to respond properly fail the job", + name: "test when fetch token request times out fail the job", job: model.Job{ ID: 3, WorkspaceID: "1001", @@ -404,6 +411,7 @@ func TestOAuth(t *testing.T) { }, }, dest: model.Destination{ + DestinationID: "1234", Config: map[string]interface{}{ "rudderDeleteAccountId": "xyz", }, @@ -414,34 +422,187 @@ func TestOAuth(t *testing.T) { }, }, }, - respCode: 500, - respBodyStatus: "failed", - cpResponses: []cpResponseParams{ { code: 500, response: `Internal Server Error`, + timeout: 2 * time.Second, }, }, + deleteResponses: []deleteResponseParams{{}}, + + oauthHttpClientTimeout: 1 * time.Second, expectedDeleteStatus: model.JobStatusFailed, expectedPayload: "", // since request has not gone to transformer at all! }, + { + // In this case the request will not even reach transformer, as OAuth is required but we don't have "rudderDeleteAccountId" + name: "when rudderDeleteAccountId is present but is empty string in destination config fail the job", + job: model.Job{ + ID: 1, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293437@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad853601942465969121327991@gmail.com", + }, + }, + { + ID: "Claiborn443446989226249191822329", + Attributes: map[string]string{ + "phone": "8782905113", + }, + }, + }, + }, + dest: model.Destination{ + DestinationID: "1234", + Config: map[string]interface{}{ + "rudderDeleteAccountId": "", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + cpResponses: []cpResponseParams{}, + deleteResponses: []deleteResponseParams{{}}, + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: "", + }, + { + // In this case the request will not even reach transformer, as OAuth is required but we don't have "rudderDeleteAccountId" + name: "when rudderDeleteAccountId field is not present in destination config fail the job", + job: model.Job{ + ID: 1, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293437@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad853601942465969121327991@gmail.com", + }, + }, + { + ID: "Claiborn443446989226249191822329", + Attributes: map[string]string{ + "phone": "8782905113", + }, + }, + }, + }, + dest: model.Destination{ + DestinationID: "1234", + Config: map[string]interface{}{}, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + cpResponses: []cpResponseParams{}, + deleteResponses: []deleteResponseParams{{}}, + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: "", + }, + { + name: "test when refresh token request times out, retry once and pass if cfg-be server is up", + job: model.Job{ + ID: 9, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine9", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane9@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie9", + Attributes: map[string]string{ + "email": "dshirilad9@gmail.com", + }, + }, + }, + }, + dest: model.Destination{ + DestinationID: "1234", + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + + oauthHttpClientTimeout: 1 * time.Second, + cpResponses: []cpResponseParams{ + { + code: 200, + response: `{"secret": {"access_token": "expired_access_token","refresh_token":"valid_refresh_token"}}`, + }, + { + code: 500, + response: `Internal Server Error`, + timeout: 2 * time.Second, + }, + }, + deleteResponses: []deleteResponseParams{ + { + status: 500, + jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN","error":"[GA] invalid credentials"}]`, + }, + }, + + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: `[{"jobId":"9","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane9@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine9"},{"email":"dshirilad9@gmail.com","userId":"Mercie9"}]}]`, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - d := deleteAPI{ - respStatusCode: tt.respCode, - authErrCategory: tt.authErrorCategory, - } ctx := context.Background() - svr := httptest.NewServer(d.handler()) cpRespProducer := &cpResponseProducer{ responses: tt.cpResponses, } + deleteRespProducer := &deleteResponseProducer{ + responses: tt.deleteResponses, + } cfgBeSrv := httptest.NewServer(cpRespProducer.mockCpRequests()) + svr := httptest.NewServer(deleteRespProducer.mockDeleteRequests()) defer svr.Close() defer cfgBeSrv.Close() @@ -452,7 +613,7 @@ func TestOAuth(t *testing.T) { backendconfig.Init() oauth.Init() - OAuth := oauth.NewOAuthErrorHandler(mockBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) + OAuth := oauth.NewOAuthErrorHandler(mockBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete), oauth.WithOAuthClientTimeout(tt.oauthHttpClientTimeout)) api := api.APIManager{ Client: &http.Client{}, DestTransformURL: svr.URL, @@ -462,7 +623,8 @@ func TestOAuth(t *testing.T) { status := api.Delete(ctx, tt.job, tt.dest) require.Equal(t, tt.expectedDeleteStatus, status) - require.Equal(t, tt.expectedPayload, d.payload) + // TODO: Compare input payload for all "/deleteUsers" requests + require.Equal(t, tt.expectedPayload, deleteRespProducer.GetCurrent().actualPayload) }) } } @@ -516,3 +678,59 @@ func (cpRespProducer *cpResponseProducer) mockCpRequests() *mux.Router { }) return srvMux } + +// This part is to support multiple responses from deleteMockServer as we have retry mechanism embedded for OAuth +type deleteResponseParams struct { + status int + timeout time.Duration + jobResponse string // should be in structure of []api.JobRespSchema + actualPayload string +} +type deleteResponseProducer struct { + responses []deleteResponseParams + callCount int +} + +func (s *deleteResponseProducer) GetCurrent() *deleteResponseParams { + if s.callCount == 0 { + return &s.responses[s.callCount] + } + return &s.responses[s.callCount-1] +} + +func (s *deleteResponseProducer) GetNext() *deleteResponseParams { + if s.callCount >= len(s.responses) { + panic("ran out of responses") + } + deleteResp := &s.responses[s.callCount] + s.callCount++ + return deleteResp +} + +func (delRespProducer *deleteResponseProducer) mockDeleteRequests() *mux.Router { + srvMux := mux.NewRouter() + srvMux.HandleFunc("/deleteUsers", func(w http.ResponseWriter, req *http.Request) { + buf := new(bytes.Buffer) + _, bufErr := buf.ReadFrom(req.Body) + if bufErr != nil { + http.Error(w, bufErr.Error(), http.StatusInternalServerError) + } + // useful in validating the payload(sent in request body to transformer) + delRespProducer.GetCurrent().actualPayload = buf.String() + + delResp := delRespProducer.GetNext() + // sleep is being used to mimic the waiting in actual transformer response + if delResp.timeout > 0 { + time.Sleep(delResp.timeout) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(delResp.status) + // Lint error fix + _, err := w.Write([]byte(delResp.jobResponse)) + if err != nil { + fmt.Printf("I'm here!!!! Some shitty response!!") + http.Error(w, fmt.Sprintf("Provided response is faulty, please check it. Err: %v", err.Error()), http.StatusInternalServerError) + } + }).Methods(http.MethodPost) + return srvMux +}