diff --git a/mocks/router/oauthResponseHandler/mock_oauthResponseHandler.go b/mocks/services/oauth/mock_oauth.go similarity index 70% rename from mocks/router/oauthResponseHandler/mock_oauthResponseHandler.go rename to mocks/services/oauth/mock_oauth.go index cdc731340d..0fc7e94deb 100644 --- a/mocks/router/oauthResponseHandler/mock_oauthResponseHandler.go +++ b/mocks/services/oauth/mock_oauth.go @@ -1,15 +1,15 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/rudderlabs/rudder-server/router/oauthResponseHandler (interfaces: Authorizer) +// Source: github.com/rudderlabs/rudder-server/services/oauth (interfaces: Authorizer) -// Package mocks_oauthResponseHandler is a generated GoMock package. -package mocks_oauthResponseHandler +// Package mocks_oauth is a generated GoMock package. +package mocks_oauth import ( reflect "reflect" gomock "github.com/golang/mock/gomock" backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" - oauthResponseHandler "github.com/rudderlabs/rudder-server/router/oauthResponseHandler" + oauth "github.com/rudderlabs/rudder-server/services/oauth" ) // MockAuthorizer is a mock of Authorizer interface. @@ -51,11 +51,11 @@ func (mr *MockAuthorizerMockRecorder) DisableDestination(arg0, arg1, arg2 interf } // FetchToken mocks base method. -func (m *MockAuthorizer) FetchToken(arg0 *oauthResponseHandler.RefreshTokenParams) (int, *oauthResponseHandler.AuthResponse) { +func (m *MockAuthorizer) FetchToken(arg0 *oauth.RefreshTokenParams) (int, *oauth.AuthResponse) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchToken", arg0) ret0, _ := ret[0].(int) - ret1, _ := ret[1].(*oauthResponseHandler.AuthResponse) + ret1, _ := ret[1].(*oauth.AuthResponse) return ret0, ret1 } @@ -66,11 +66,11 @@ func (mr *MockAuthorizerMockRecorder) FetchToken(arg0 interface{}) *gomock.Call } // RefreshToken mocks base method. -func (m *MockAuthorizer) RefreshToken(arg0 *oauthResponseHandler.RefreshTokenParams) (int, *oauthResponseHandler.AuthResponse) { +func (m *MockAuthorizer) RefreshToken(arg0 *oauth.RefreshTokenParams) (int, *oauth.AuthResponse) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RefreshToken", arg0) ret0, _ := ret[0].(int) - ret1, _ := ret[1].(*oauthResponseHandler.AuthResponse) + ret1, _ := ret[1].(*oauth.AuthResponse) return ret0, ret1 } @@ -79,15 +79,3 @@ func (mr *MockAuthorizerMockRecorder) RefreshToken(arg0 interface{}) *gomock.Cal mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshToken", reflect.TypeOf((*MockAuthorizer)(nil).RefreshToken), arg0) } - -// Setup mocks base method. -func (m *MockAuthorizer) Setup() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Setup") -} - -// Setup indicates an expected call of Setup. -func (mr *MockAuthorizerMockRecorder) Setup() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Setup", reflect.TypeOf((*MockAuthorizer)(nil).Setup)) -} diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index 6d47fb7eb7..b57b524219 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -22,16 +22,20 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize" "github.com/rudderlabs/rudder-server/regulation-worker/internal/service" "github.com/rudderlabs/rudder-server/services/filemanager" + "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" ) var pkgLogger = logger.NewLogger().Child("regulation-worker") -func main() { +func init() { initialize.Init() backendconfig.Init() + oauth.Init() +} +func main() { pkgLogger.Info("starting regulation-worker") ctx, cancel := context.WithCancel(context.Background()) @@ -58,6 +62,9 @@ func Run(ctx context.Context) { if err != nil { panic(fmt.Errorf("error while getting workspaceId: %w", err)) } + // setting up oauth + OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) + svc := service.JobSvc{ API: &client.JobAPI{ Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)}, @@ -75,6 +82,7 @@ func Run(ctx context.Context) { &api.APIManager{ Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)}, DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"), + OAuth: OAuth, }), } diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index b0db466035..3074ec211e 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -15,28 +16,41 @@ import ( "strings" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" + "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/utils/logger" ) var ( pkgLogger = logger.NewLogger().Child("api") - supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP"} + supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA"} ) type APIManager struct { Client *http.Client DestTransformURL string + OAuth oauth.Authorizer +} + +type oauthDetail struct { + secretToken *oauth.AuthResponse + id string +} + +func (*APIManager) GetSupportedDestinations() []string { + return supportedDestinations } // prepares payload based on (job,destDetail) & make an API call to transformer. // gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller. -func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus { +func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus { + pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destination.Name) method := http.MethodPost endpoint := "/deleteUsers" url := fmt.Sprint(api.DestTransformURL, endpoint) - bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig) + bodySchema := mapJobToPayload(job, strings.ToLower(destination.Name), destination.Config) + pkgLogger.Debugf("payload: %#v", bodySchema) reqBody, err := json.Marshal(bodySchema) if err != nil { @@ -49,11 +63,27 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map } req.Header.Set("Content-Type", "application/json") + // check if OAuth destination + isOAuthEnabled := oauth.GetAuthType(destination.DestDefConfig) == oauth.OAuth + var oAuthDetail oauthDetail + if isOAuthEnabled { + oAuthDetail, err = api.getOAuthDetail(&destination, job.WorkspaceID) + if err != nil { + pkgLogger.Error(err) + return model.JobStatusFailed + } + err = setOAuthHeader(oAuthDetail.secretToken, req) + if err != nil { + pkgLogger.Error(err) + return model.JobStatusFailed + } + } + fileCleaningTime := stats.Default.NewTaggedStat("file_cleaning_time", stats.TimerType, stats.Tags{ "jobId": fmt.Sprintf("%d", job.ID), "workspaceId": job.WorkspaceID, "destType": "api", - "destName": strings.ToLower(destName), + "destName": strings.ToLower(destination.Name), }) fileCleaningTime.Start() defer fileCleaningTime.End() @@ -75,8 +105,25 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map if err := json.Unmarshal(bodyBytes, &jobResp); err != nil { return model.JobStatusFailed } + jobStatus := getJobStatus(resp.StatusCode, jobResp) + pkgLogger.Debugf("[%v] JobStatus for %v: %v", destination.Name, destination.DestinationID, jobStatus) + + if isOAuthEnabled && isTokenExpired(jobResp) { + err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) + if err != nil { + pkgLogger.Error(err) + return model.JobStatusFailed + } + // retry the request + pkgLogger.Debug("Retrying deleteRequest job for the whole batch") + return api.Delete(ctx, job, destination) + } + + return jobStatus +} - switch resp.StatusCode { +func getJobStatus(statusCode int, jobResp []JobRespSchema) model.JobStatus { + switch statusCode { case http.StatusOK: return model.JobStatusComplete @@ -103,10 +150,6 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map } } -func (*APIManager) GetSupportedDestinations() []string { - return supportedDestinations -} - func mapJobToPayload(job model.Job, destName string, destConfig map[string]interface{}) []apiDeletionPayloadSchema { uas := make([]userAttributesSchema, len(job.Users)) for i, ua := range job.Users { @@ -126,3 +169,58 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter }, } } + +func isTokenExpired(jobResponses []JobRespSchema) bool { + for _, jobResponse := range jobResponses { + if jobResponse.AuthErrorCategory == oauth.REFRESH_TOKEN { + return true + } + } + return false +} + +func setOAuthHeader(secretToken *oauth.AuthResponse, req *http.Request) error { + payload, marshalErr := json.Marshal(secretToken.Account) + if marshalErr != nil { + marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr) + pkgLogger.Errorf(marshalFailErr) + return errors.New(marshalFailErr) + } + req.Header.Set("X-Rudder-Dest-Info", string(payload)) + return nil +} + +func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId string) (oauthDetail, error) { + id := oauth.GetAccountId(destDetail.Config, oauth.DeleteAccountIdKey) + if strings.TrimSpace(id) == "" { + return oauthDetail{}, fmt.Errorf("%v is not present for %v", oauth.DeleteAccountIdKey, destDetail.Name) + } + tokenStatusCode, secretToken := api.OAuth.FetchToken(&oauth.RefreshTokenParams{ + AccountId: id, + WorkspaceId: workspaceId, + DestDefName: destDetail.Name, + EventNamePrefix: "fetch_token", + }) + if tokenStatusCode != http.StatusOK { + return oauthDetail{}, fmt.Errorf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s", destDetail.Name, tokenStatusCode, secretToken.Err) + } + return oauthDetail{ + id: id, + secretToken: secretToken, + }, nil +} + +func (api *APIManager) refreshOAuthToken(destName, workspaceId string, oAuthDetail oauthDetail) error { + refTokenParams := &oauth.RefreshTokenParams{ + Secret: oAuthDetail.secretToken.Account.Secret, + WorkspaceId: workspaceId, + AccountId: oAuthDetail.id, + DestDefName: destName, + EventNamePrefix: "refresh_token", + } + statusCode, _ := api.OAuth.RefreshToken(refTokenParams) + if statusCode != http.StatusOK { + return fmt.Errorf("failed to refresh token for destination: %v", destName) + } + return nil +} diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index 9ec3805198..26b719c9b4 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -3,14 +3,20 @@ package api_test import ( "bytes" "encoding/json" + "fmt" "net/http" "net/http/httptest" "testing" + "time" + "github.com/golang/mock/gomock" "github.com/gorilla/mux" + backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" + mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/config/backend-config" "github.com/rudderlabs/rudder-server/regulation-worker/internal/delete/api" "github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" + "github.com/rudderlabs/rudder-server/services/oauth" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -142,7 +148,11 @@ func TestDelete(t *testing.T) { Client: &http.Client{}, DestTransformURL: svr.URL, } - status := api.Delete(ctx, tt.job, tt.destConfig, tt.destName) + dest := model.Destination{ + Config: tt.destConfig, + Name: tt.destName, + } + status := api.Delete(ctx, tt.job, dest) require.Equal(t, tt.expectedDeleteStatus, status) require.Equal(t, tt.expectedPayload, d.payload) }) @@ -150,10 +160,11 @@ func TestDelete(t *testing.T) { } type deleteAPI struct { - payload string - respStatusCode int - respBodyStatus model.JobStatus - respBodyErr error + payload string + respStatusCode int + respBodyStatus model.JobStatus + respBodyErr error + authErrCategory string } func (d *deleteAPI) deleteMockServer(w http.ResponseWriter, r *http.Request) { @@ -173,6 +184,9 @@ func (d *deleteAPI) deleteMockServer(w http.ResponseWriter, r *http.Request) { if d.respBodyErr != nil { resp.Error = d.respBodyErr.Error() } + if d.authErrCategory != "" { + resp.AuthErrorCategory = d.authErrCategory + } body, err := json.Marshal([]api.JobRespSchema{resp}) if err != nil { @@ -185,3 +199,320 @@ func (d *deleteAPI) deleteMockServer(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) } } + +func TestOAuth(t *testing.T) { + initialize.Init() + + mockCtrl := gomock.NewController(t) + mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl) + mockBackendConfig.EXPECT().AccessToken().AnyTimes() + + tests := []struct { + name string + job model.Job + dest model.Destination + destConfig map[string]interface{} + destName string + respCode int + respBodyStatus model.JobStatus + authErrorCategory string + respBodyErr error + cpResponses []cpResponseParams + expectedDeleteStatus model.JobStatus + expectedPayload string + }{ + { + name: "test with a valid token and successful response", + job: model.Job{ + ID: 1, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293437@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad853601942465969121327991@gmail.com", + }, + }, + { + ID: "Claiborn443446989226249191822329", + Attributes: map[string]string{ + "phone": "8782905113", + }, + }, + }, + }, + dest: model.Destination{ + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + respCode: 200, + cpResponses: []cpResponseParams{ + { + code: 200, + response: `{"secret": {"access_token": "valid_access_token","refresh_token":"valid_refresh_token"}}`, + }, + }, + respBodyStatus: "complete", + expectedDeleteStatus: model.JobStatusComplete, + expectedPayload: `[{"jobId":"1","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293437@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad853601942465969121327991@gmail.com","userId":"Mercie8221821544021583104106123"},{"phone":"8782905113","userId":"Claiborn443446989226249191822329"}]}]`, + }, + { + name: "test with an expired token and validate if token is getting changed", + job: model.Job{ + ID: 2, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293438@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad8536019424659691213279982@gmail.com", + }, + }, + }, + }, + dest: model.Destination{ + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + respCode: 500, + respBodyStatus: "failed", + authErrorCategory: oauth.REFRESH_TOKEN, + + cpResponses: []cpResponseParams{ + { + code: 200, + response: `{"secret": {"access_token": "expired_access_token","refresh_token":"valid_refresh_token"}}`, + }, + { + code: 200, + response: `{"secret": {"access_token": "refreshed_access_token","refresh_token":"valid_refresh_token"}}`, + }, + }, + + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: `[{"jobId":"2","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293438@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad8536019424659691213279982@gmail.com","userId":"Mercie8221821544021583104106123"}]}]`, + }, + { + name: "test with an expired token, when refresh throws 500 validate if job failed", + job: model.Job{ + ID: 4, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293439@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad8536019424659691213279985@gmail.com", + }, + }, + }, + }, + dest: model.Destination{ + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + respCode: 500, + respBodyStatus: "failed", + authErrorCategory: oauth.REFRESH_TOKEN, + + cpResponses: []cpResponseParams{ + { + code: 200, + response: `{"secret": {"access_token": "expired_access_token","refresh_token":"valid_refresh_token"}}`, + }, + { + code: 500, + response: `Internal Server Error`, + }, + }, + + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: `[{"jobId":"4","destType":"ga","config":{"rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"dorowane8n285680461479465450293439@gmail.com","phone":"6463633841","randomKey":"randomValue","userId":"Jermaine1473336609491897794707338"},{"email":"dshirilad8536019424659691213279985@gmail.com","userId":"Mercie8221821544021583104106123"}]}]`, + }, + { + name: "test when fetch token fails(with 500) to respond properly fail the job", + job: model.Job{ + ID: 3, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatusPending, + Users: []model.User{ + { + ID: "Jermaine1473336609491897794707338", + Attributes: map[string]string{ + "phone": "6463633841", + "email": "dorowane8n285680461479465450293448@gmail.com", + "randomKey": "randomValue", + }, + }, + { + ID: "Mercie8221821544021583104106123", + Attributes: map[string]string{ + "email": "dshirilad8536019424659691213279983@gmail.com", + }, + }, + }, + }, + dest: model.Destination{ + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + }, + Name: "GA", + DestDefConfig: map[string]interface{}{ + "auth": map[string]interface{}{ + "type": "OAuth", + }, + }, + }, + respCode: 500, + respBodyStatus: "failed", + + cpResponses: []cpResponseParams{ + { + code: 500, + response: `Internal Server Error`, + }, + }, + + expectedDeleteStatus: model.JobStatusFailed, + expectedPayload: "", // since request has not gone to transformer at all! + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := deleteAPI{ + respStatusCode: tt.respCode, + authErrCategory: tt.authErrorCategory, + } + ctx := context.Background() + svr := httptest.NewServer(d.handler()) + + cpRespProducer := &cpResponseProducer{ + responses: tt.cpResponses, + } + cfgBeSrv := httptest.NewServer(cpRespProducer.mockCpRequests()) + + defer svr.Close() + defer cfgBeSrv.Close() + + t.Setenv("DEST_TRANSFORM_URL", svr.URL) + t.Setenv("CONFIG_BACKEND_URL", cfgBeSrv.URL) + t.Setenv("CONFIG_BACKEND_TOKEN", "config_backend_token") + + backendconfig.Init() + oauth.Init() + OAuth := oauth.NewOAuthErrorHandler(mockBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) + api := api.APIManager{ + Client: &http.Client{}, + DestTransformURL: svr.URL, + OAuth: OAuth, + } + + status := api.Delete(ctx, tt.job, tt.dest) + + require.Equal(t, tt.expectedDeleteStatus, status) + require.Equal(t, tt.expectedPayload, d.payload) + }) + } +} + +type cpResponseParams struct { + timeout time.Duration + code int + response string +} +type cpResponseProducer struct { + responses []cpResponseParams + callCount int +} + +func (s *cpResponseProducer) GetNext() cpResponseParams { + if s.callCount >= len(s.responses) { + panic("ran out of responses") + } + cpResp := s.responses[s.callCount] + s.callCount++ + return cpResp +} + +func (cpRespProducer *cpResponseProducer) mockCpRequests() *mux.Router { + srvMux := mux.NewRouter() + srvMux.HandleFunc("/destination/workspaces/{workspaceId}/accounts/{accountId}/token", func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + // iterating over request parameters + for _, reqParam := range []string{"workspaceId", "accountId"} { + _, ok := vars[reqParam] + if !ok { + // This case wouldn't occur I guess + http.Error(w, fmt.Sprintf("Wrong url being sent: %v", reqParam), http.StatusInternalServerError) + return + } + } + + cpResp := cpRespProducer.GetNext() + // sleep is being used to mimic the waiting in actual transformer response + if cpResp.timeout > 0 { + time.Sleep(cpResp.timeout) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(cpResp.code) + // Lint error fix + _, err := w.Write([]byte(cpResp.response)) + if err != nil { + fmt.Printf("I'm here!!!! Some shitty response!!") + http.Error(w, fmt.Sprintf("Provided response is faulty, please check it. Err: %v", err.Error()), http.StatusInternalServerError) + } + }) + return srvMux +} diff --git a/regulation-worker/internal/delete/api/schema.go b/regulation-worker/internal/delete/api/schema.go index 8647d8fb54..51fec7ada0 100644 --- a/regulation-worker/internal/delete/api/schema.go +++ b/regulation-worker/internal/delete/api/schema.go @@ -10,6 +10,7 @@ type apiDeletionPayloadSchema struct { } type JobRespSchema struct { - Status string `json:"status"` - Error string `json:"error"` + Status string `json:"status"` + Error string `json:"error"` + AuthErrorCategory string `json:"authErrorCategory"` } diff --git a/regulation-worker/internal/delete/batch/batch.go b/regulation-worker/internal/delete/batch/batch.go index acb4eaf4cf..3337a68787 100644 --- a/regulation-worker/internal/delete/batch/batch.go +++ b/regulation-worker/internal/delete/batch/batch.go @@ -315,9 +315,11 @@ func (*BatchManager) GetSupportedDestinations() []string { func (bm *BatchManager) Delete( ctx context.Context, job model.Job, - destConfig map[string]interface{}, - destName string, + destDetail model.Destination, ) model.JobStatus { + destConfig := destDetail.Config + destName := destDetail.Name + pkgLogger.Debugf("deleting job: %v", job, "from batch destination: %v", destName) fm, err := bm.FMFactory.New(&filemanager.SettingsT{Provider: destName, Config: destConfig}) diff --git a/regulation-worker/internal/delete/batch/batch_test.go b/regulation-worker/internal/delete/batch/batch_test.go index df21e3a04b..1b36fc27ba 100644 --- a/regulation-worker/internal/delete/batch/batch_test.go +++ b/regulation-worker/internal/delete/batch/batch_test.go @@ -85,7 +85,7 @@ func TestBatchDelete(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - status := bm.Delete(ctx, tt.job, tt.dest.Config, tt.dest.Name) + status := bm.Delete(ctx, tt.job, tt.dest) require.Equal(t, model.JobStatusComplete, status) searchDir := mockBucketLocation diff --git a/regulation-worker/internal/delete/delete.go b/regulation-worker/internal/delete/delete.go index 76f53e3348..6ce752fb6b 100644 --- a/regulation-worker/internal/delete/delete.go +++ b/regulation-worker/internal/delete/delete.go @@ -12,7 +12,7 @@ var pkgLogger = logger.NewLogger().Child("client") //go:generate mockgen -source=delete.go -destination=mock_delete_test.go -package=delete github.com/rudderlabs/rudder-server/regulation-worker/internal/delete type deleteManager interface { - Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus + Delete(ctx context.Context, job model.Job, destDetail model.Destination) model.JobStatus GetSupportedDestinations() []string } @@ -44,7 +44,7 @@ func (r *Router) Delete(ctx context.Context, job model.Job, dest model.Destinati }) if _, ok := r.router[dest.Name]; ok { pkgLogger.Debugf("calling deletion manager: %v", r.router[dest.Name]) - return r.router[dest.Name].Delete(ctx, job, dest.Config, dest.Name) + return r.router[dest.Name].Delete(ctx, job, dest) } pkgLogger.Errorf("no deletion manager support deletion from destination: %v", dest.Name) diff --git a/regulation-worker/internal/delete/delete_test.go b/regulation-worker/internal/delete/delete_test.go index e212b44163..c18c6fd8bb 100644 --- a/regulation-worker/internal/delete/delete_test.go +++ b/regulation-worker/internal/delete/delete_test.go @@ -51,7 +51,7 @@ func TestDelete(t *testing.T) { for _, tt := range testData { md1.EXPECT().GetSupportedDestinations().Return([]string{"d1", "d2"}).Times(tt.getSupportedDestinationsCallCount) md2.EXPECT().GetSupportedDestinations().Return([]string{"d3", "d4"}).Times(tt.getSupportedDestinationsCallCount) - md1.EXPECT().Delete(ctx, tt.job, tt.destDetail.Config, tt.destDetail.Name).Return(model.JobStatusComplete).Times(tt.md1CallCount) + md1.EXPECT().Delete(ctx, tt.job, tt.destDetail).Return(model.JobStatusComplete).Times(tt.md1CallCount) status := r.Delete(ctx, tt.job, tt.destDetail) require.Equal(t, tt.expectedStatus, status, "actual status different than expected") } diff --git a/regulation-worker/internal/delete/kvstore/kvstore.go b/regulation-worker/internal/delete/kvstore/kvstore.go index fe5346623f..a5c940b994 100644 --- a/regulation-worker/internal/delete/kvstore/kvstore.go +++ b/regulation-worker/internal/delete/kvstore/kvstore.go @@ -21,7 +21,10 @@ func (*KVDeleteManager) GetSupportedDestinations() []string { return supportedDestinations } -func (*KVDeleteManager) Delete(_ context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus { +func (*KVDeleteManager) Delete(_ context.Context, job model.Job, destDetail model.Destination) model.JobStatus { + destConfig := destDetail.Config + destName := destDetail.Name + pkgLogger.Debugf("deleting job: %v", job, " from kvstore") kvm := kvstoremanager.New(destName, destConfig) var err error diff --git a/regulation-worker/internal/delete/kvstore/kvstore_test.go b/regulation-worker/internal/delete/kvstore/kvstore_test.go index cc40d8e91a..58fb8db5d6 100644 --- a/regulation-worker/internal/delete/kvstore/kvstore_test.go +++ b/regulation-worker/internal/delete/kvstore/kvstore_test.go @@ -113,13 +113,15 @@ func TestRedisDeletion(t *testing.T) { }, } - destName := "REDIS" - destConfig := map[string]interface{}{ - "clusterMode": false, - "address": redisAddress, + dest := model.Destination{ + Config: map[string]interface{}{ + "clusterMode": false, + "address": redisAddress, + }, + Name: "REDIS", } - manager := kvstoremanager.New(destName, destConfig) + manager := kvstoremanager.New(dest.Name, dest.Config) // inserting test data in Redis for _, test := range inputTestData { @@ -155,7 +157,7 @@ func TestRedisDeletion(t *testing.T) { } // deleting the last key inserted - status := kvstore.Delete(ctx, deleteJob, destConfig, destName) + status := kvstore.Delete(ctx, deleteJob, dest) require.Equal(t, model.JobStatusComplete, status, "actual deletion status different than expected") fieldCountAfterDelete := make([]int, len(inputTestData)) diff --git a/regulation-worker/internal/delete/mock_delete_test.go b/regulation-worker/internal/delete/mock_delete_test.go index da0ad85e9b..72de6a7417 100644 --- a/regulation-worker/internal/delete/mock_delete_test.go +++ b/regulation-worker/internal/delete/mock_delete_test.go @@ -36,17 +36,17 @@ func (m *MockdeleteManager) EXPECT() *MockdeleteManagerMockRecorder { } // Delete mocks base method. -func (m *MockdeleteManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus { +func (m *MockdeleteManager) Delete(ctx context.Context, job model.Job, destDetail model.Destination) model.JobStatus { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", ctx, job, destConfig, destName) + ret := m.ctrl.Call(m, "Delete", ctx, job, destDetail) ret0, _ := ret[0].(model.JobStatus) return ret0 } // Delete indicates an expected call of Delete. -func (mr *MockdeleteManagerMockRecorder) Delete(ctx, job, destConfig, destName interface{}) *gomock.Call { +func (mr *MockdeleteManagerMockRecorder) Delete(ctx, job, destDetail interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockdeleteManager)(nil).Delete), ctx, job, destConfig, destName) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockdeleteManager)(nil).Delete), ctx, job, destDetail) } // GetSupportedDestinations mocks base method. diff --git a/regulation-worker/internal/destination/destination.go b/regulation-worker/internal/destination/destination.go index 81299e62a6..41dc56801e 100644 --- a/regulation-worker/internal/destination/destination.go +++ b/regulation-worker/internal/destination/destination.go @@ -60,6 +60,8 @@ func (d *DestMiddleware) GetDestDetails(ctx context.Context, destID string) (mod destDetail.Config = dest.Config destDetail.DestinationID = dest.ID destDetail.Name = dest.DestinationDefinition.Name + // Destination Definition Config would most likely be needed + destDetail.DestDefConfig = dest.DestinationDefinition.Config pkgLogger.Debugf("obtained destination detail: %v", destDetail) return destDetail, nil } diff --git a/regulation-worker/internal/model/model.go b/regulation-worker/internal/model/model.go index 94a5ac7afd..d605cba890 100644 --- a/regulation-worker/internal/model/model.go +++ b/regulation-worker/internal/model/model.go @@ -42,6 +42,7 @@ type User struct { type Destination struct { Config map[string]interface{} + DestDefConfig map[string]interface{} DestinationID string Name string } diff --git a/router/router.go b/router/router.go index f572dabc6f..648d341912 100644 --- a/router/router.go +++ b/router/router.go @@ -25,7 +25,6 @@ import ( customDestinationManager "github.com/rudderlabs/rudder-server/router/customdestinationmanager" "github.com/rudderlabs/rudder-server/router/internal/eventorder" "github.com/rudderlabs/rudder-server/router/internal/jobiterator" - oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler" "github.com/rudderlabs/rudder-server/router/throttler" "github.com/rudderlabs/rudder-server/router/transformer" "github.com/rudderlabs/rudder-server/router/types" @@ -34,6 +33,7 @@ import ( destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" "github.com/rudderlabs/rudder-server/services/diagnostics" "github.com/rudderlabs/rudder-server/services/metric" + "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/rsources" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/services/transientsource" @@ -460,8 +460,9 @@ func (worker *workerT) workerProcess() { continue } destination := batchDestination.Destination - if authType := routerutils.GetAuthType(destination); routerutils.IsNotEmptyString(authType) && authType == "OAuth" { - rudderAccountID := routerutils.GetRudderAccountId(&destination) + if authType := oauth.GetAuthType(destination.DestinationDefinition.Config); routerutils.IsNotEmptyString(string(authType)) && authType == oauth.OAuth { + rudderAccountID := oauth.GetAccountId(destination.Config, oauth.DeliveryAccountIdKey) + if routerutils.IsNotEmptyString(rudderAccountID) { worker.rt.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name) // Get Access Token Information to send it as part of the event @@ -672,8 +673,8 @@ func (worker *workerT) processDestinationJobs() { respStatusCode, respBodyTemp, respContentType = worker.rt.transformer.ProxyRequest(ctx, proxyReqparams) worker.routerProxyStat.SendTiming(time.Since(rtlTime)) pkgLogger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request ended`, worker.rt.destName, jobID) - authType := routerutils.GetAuthType(destinationJob.Destination) - if routerutils.IsNotEmptyString(authType) && authType == "OAuth" { + authType := oauth.GetAuthType(destinationJob.Destination.DestinationDefinition.Config) + if routerutils.IsNotEmptyString(string(authType)) && authType == oauth.OAuth { pkgLogger.Debugf(`Sending for OAuth destination`) // Token from header of the request respStatusCode, respBodyTemp = worker.rt.HandleOAuthDestResponse(&HandleDestOAuthRespParamsT{ @@ -1955,7 +1956,6 @@ func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsd rt.transformer = transformer.NewTransformer(rt.netClientTimeout, rt.backendProxyTimeout) rt.oauth = oauth.NewOAuthErrorHandler(backendConfig) - rt.oauth.Setup() var t throttler.HandleT t.SetUp(rt.destName) @@ -2089,7 +2089,7 @@ func (rt *HandleT) HandleOAuthDestResponse(params *HandleDestOAuthRespParamsT) ( var errCatStatusCode int // Check the category // Trigger the refresh endpoint/disable endpoint - rudderAccountID := routerutils.GetRudderAccountId(&destinationJob.Destination) + rudderAccountID := oauth.GetAccountId(destinationJob.Destination.Config, oauth.DeliveryAccountIdKey) if strings.TrimSpace(rudderAccountID) == "" { return trRespStatusCode, trRespBody } @@ -2119,6 +2119,7 @@ func (rt *HandleT) HandleOAuthDestResponse(params *HandleDestOAuthRespParamsT) ( "workspaceId": refTokenParams.WorkspaceId, "accountId": refTokenParams.AccountId, "destType": refTokenParams.DestDefName, + "flowType": string(oauth.RudderFlow_Delivery), }).Increment() rt.logger.Errorf(`[OAuth request] Aborting the event as %v`, oauth.INVALID_REFRESH_TOKEN_GRANT) return disableStCd, refSec.Err @@ -2141,6 +2142,7 @@ func (rt *HandleT) ExecDisableDestination(destination *backendconfig.Destination "destType": destination.DestinationDefinition.Name, "workspaceId": workspaceID, "success": "true", + "flowType": string(oauth.RudderFlow_Delivery), } errCatStatusCode, errCatResponse := rt.oauth.DisableDestination(destination, workspaceID, rudderAccountId) if errCatStatusCode != http.StatusOK { diff --git a/router/utils/utils.go b/router/utils/utils.go index bf6b3341e5..ad40a8200d 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -99,26 +99,3 @@ func EnhanceJSON(rawMsg []byte, key, val string) []byte { func IsNotEmptyString(s string) bool { return len(strings.TrimSpace(s)) > 0 } - -func GetAuthType(dest backendconfig.DestinationT) (authType string) { - destConfig := dest.DestinationDefinition.Config - var lookupErr error - var authValue interface{} - if authValue, lookupErr = misc.NestedMapLookup(destConfig, "auth", "type"); lookupErr != nil { - return "" - } - authType, ok := authValue.(string) - if !ok { - return "" - } - return authType -} - -func GetRudderAccountId(destination *backendconfig.DestinationT) string { - if rudderAccountIdInterface, found := destination.Config["rudderAccountId"]; found { - if rudderAccountId, ok := rudderAccountIdInterface.(string); ok { - return rudderAccountId - } - } - return "" -} diff --git a/runner/runner.go b/runner/runner.go index 02e57ff34a..b1a7cb6ada 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -39,7 +39,6 @@ import ( "github.com/rudderlabs/rudder-server/router/batchrouter" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager" "github.com/rudderlabs/rudder-server/router/customdestinationmanager" - oauth "github.com/rudderlabs/rudder-server/router/oauthResponseHandler" routertransformer "github.com/rudderlabs/rudder-server/router/transformer" batchrouterutils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/alert" @@ -53,6 +52,7 @@ import ( destinationconnectiontester "github.com/rudderlabs/rudder-server/services/destination-connection-tester" "github.com/rudderlabs/rudder-server/services/diagnostics" "github.com/rudderlabs/rudder-server/services/multitenant" + "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/pgnotifier" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/services/streammanager/kafka" diff --git a/router/oauthResponseHandler/oauthResponseHandler.go b/services/oauth/oauth.go similarity index 87% rename from router/oauthResponseHandler/oauthResponseHandler.go rename to services/oauth/oauth.go index f3354f566c..94e243ee66 100644 --- a/router/oauthResponseHandler/oauthResponseHandler.go +++ b/services/oauth/oauth.go @@ -1,6 +1,6 @@ -package oauthResponseHandler +package oauth -//go:generate mockgen -destination=../../mocks/router/oauthResponseHandler/mock_oauthResponseHandler.go -package=mocks_oauthResponseHandler github.com/rudderlabs/rudder-server/router/oauthResponseHandler Authorizer +//go:generate mockgen -destination=../../mocks/services/oauth/mock_oauth.go -package=mocks_oauth github.com/rudderlabs/rudder-server/services/oauth Authorizer import ( "bytes" "encoding/json" @@ -17,9 +17,26 @@ import ( router_utils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/utils/logger" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/tidwall/gjson" ) +type ( + AuthType string + RudderFlow string +) + +const ( + OAuth AuthType = "OAuth" + InvalidAuthType AuthType = "InvalidAuthType" + + RudderFlow_Delivery RudderFlow = "delivery" + RudderFlow_Delete RudderFlow = "delete" + + DeleteAccountIdKey = "rudderDeleteAccountId" + DeliveryAccountIdKey = "rudderAccountId" +) + type AccountSecret struct { ExpirationDate string `json:"expirationDate"` Secret json.RawMessage `json:"secret"` @@ -39,6 +56,7 @@ type OAuthStats struct { authErrCategory string destDefName string isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor + flowType RudderFlow } type DisableDestinationResponse struct { @@ -66,11 +84,11 @@ type OAuthErrResHandler struct { destAuthInfoMap map[string]*AuthResponse refreshActiveMap map[string]bool // Used to check if a refresh request for an account is already InProgress disableDestActiveMap map[string]bool // Used to check if a disable destination request for a destination is already InProgress - TokenProvider tokenProvider + tokenProvider tokenProvider + rudderFlowType RudderFlow } type Authorizer interface { - Setup() DisableDestination(destination *backendconfig.DestinationT, workspaceId, rudderAccountId string) (statusCode int, resBody string) RefreshToken(refTokenParams *RefreshTokenParams) (int, *AuthResponse) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse) @@ -85,11 +103,6 @@ type ControlPlaneRequestT struct { RequestType string // This is to add more refined stat tags } -// This function creates a new OauthErrorResponseHandler -func NewOAuthErrorHandler(provider tokenProvider) *OAuthErrResHandler { - return &OAuthErrResHandler{TokenProvider: provider} -} - var ( configBEURL string pkgLogger logger.Logger @@ -118,17 +131,60 @@ func Init() { loggerNm = "OAuthResponseHandler" } -func (authErrHandler *OAuthErrResHandler) Setup() { - authErrHandler.logger = pkgLogger - authErrHandler.tr = &http.Transport{} - // This timeout is kind of modifiable & it seemed like 10 mins for this is too much! - authErrHandler.client = &http.Client{Timeout: config.GetDuration("HttpClient.oauth.timeout", 30, time.Second)} - authErrHandler.destLockMap = make(map[string]*sync.RWMutex) - authErrHandler.accountLockMap = make(map[string]*sync.RWMutex) - authErrHandler.lockMapWMutex = &sync.RWMutex{} - authErrHandler.destAuthInfoMap = make(map[string]*AuthResponse) - authErrHandler.refreshActiveMap = make(map[string]bool) - authErrHandler.disableDestActiveMap = make(map[string]bool) +func GetAuthType(config map[string]interface{}) AuthType { + var lookupErr error + var authValue interface{} + if authValue, lookupErr = misc.NestedMapLookup(config, "auth", "type"); lookupErr != nil { + return "" + } + authType, ok := authValue.(string) + if !ok { + return "" + } + return AuthType(authType) +} + +// This function creates a new OauthErrorResponseHandler +func NewOAuthErrorHandler(provider tokenProvider, options ...func(*OAuthErrResHandler)) *OAuthErrResHandler { + oAuthErrResHandler := &OAuthErrResHandler{ + tokenProvider: provider, + logger: pkgLogger, + tr: &http.Transport{}, + client: &http.Client{Timeout: config.GetDuration("HttpClient.oauth.timeout", 30, time.Second)}, + // This timeout is kind of modifiable & it seemed like 10 mins for this is too much! + destLockMap: make(map[string]*sync.RWMutex), + accountLockMap: make(map[string]*sync.RWMutex), + lockMapWMutex: &sync.RWMutex{}, + destAuthInfoMap: make(map[string]*AuthResponse), + refreshActiveMap: make(map[string]bool), + disableDestActiveMap: make(map[string]bool), + rudderFlowType: RudderFlow_Delivery, + } + for _, opt := range options { + opt(oAuthErrResHandler) + } + return oAuthErrResHandler +} + +func GetAccountId(config map[string]interface{}, idKey string) string { + if rudderAccountIdInterface, found := config[idKey]; found { + if rudderAccountId, ok := rudderAccountIdInterface.(string); ok { + return rudderAccountId + } + } + return "" +} + +func WithRudderFlow(rudderFlow RudderFlow) func(*OAuthErrResHandler) { + return func(authErrHandle *OAuthErrResHandler) { + authErrHandle.rudderFlowType = rudderFlow + } +} + +func WithOAuthClientTimeout(timeout time.Duration) func(*OAuthErrResHandler) { + return func(authErrHandle *OAuthErrResHandler) { + authErrHandle.client.Timeout = timeout + } } func (authErrHandler *OAuthErrResHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *AuthResponse) { @@ -141,6 +197,7 @@ func (authErrHandler *OAuthErrResHandler) RefreshToken(refTokenParams *RefreshTo authErrCategory: REFRESH_TOKEN, errorMessage: "", destDefName: refTokenParams.DestDefName, + flowType: authErrHandler.rudderFlowType, } return authErrHandler.GetTokenInfo(refTokenParams, "Refresh token", authStats) } @@ -156,6 +213,7 @@ func (authErrHandler *OAuthErrResHandler) FetchToken(fetchTokenParams *RefreshTo errorMessage: "", destDefName: fetchTokenParams.DestDefName, isTokenFetch: true, + flowType: authErrHandler.rudderFlowType, } return authErrHandler.GetTokenInfo(fetchTokenParams, "Fetch token", authStats) } @@ -327,6 +385,7 @@ func (authStats *OAuthStats) SendTimerStats(startTime time.Time) { "isCallToCpApi": strconv.FormatBool(authStats.isCallToCpApi), "authErrCategory": authStats.authErrCategory, "destType": authStats.destDefName, + "flowType": string(authStats.flowType), }).SendTiming(time.Since(startTime)) } @@ -341,6 +400,7 @@ func (refStats *OAuthStats) SendCountStat() { "authErrCategory": refStats.authErrCategory, "destType": refStats.destDefName, "isTokenFetch": strconv.FormatBool(refStats.isTokenFetch), + "flowType": string(refStats.flowType), }).Increment() } @@ -358,6 +418,7 @@ func (authErrHandler *OAuthErrResHandler) DisableDestination(destination *backen authErrCategory: DISABLE_DEST, errorMessage: "", destDefName: destination.DestinationDefinition.Name, + flowType: authErrHandler.rudderFlowType, } defer func() { disableDestStats.statName = "disable_destination_total_req_latency" @@ -469,7 +530,7 @@ func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) return http.StatusBadRequest, err.Error() } // Authorisation setting - req.SetBasicAuth(authErrHandler.TokenProvider.AccessToken(), "") + req.SetBasicAuth(authErrHandler.tokenProvider.AccessToken(), "") // Set content-type in order to send the body in request correctly if router_utils.IsNotEmptyString(cpReq.ContentType) {