Skip to content

Commit

Permalink
feat: support using badgerDB for suppressions
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 15, 2022
1 parent d01f98b commit e3d78dd
Show file tree
Hide file tree
Showing 38 changed files with 1,950 additions and 876 deletions.
5 changes: 5 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,23 @@ func (a *app) initCPUProfiling() {
}

func (a *app) initFeatures() {
enterpriseLogger := logger.NewLogger().Child("enterprise")
a.features = &Features{
SuppressUser: &suppression.Factory{
EnterpriseToken: a.options.EnterpriseToken,
Log: enterpriseLogger.Child("suppress-user"),
},
Reporting: &reporting.Factory{
EnterpriseToken: a.options.EnterpriseToken,
Log: enterpriseLogger.Child("reporting"),
},
Replay: &replay.Factory{
EnterpriseToken: a.options.EnterpriseToken,
Log: enterpriseLogger.Child("replay"),
},
ConfigEnv: &configenv.Factory{
EnterpriseToken: a.options.EnterpriseToken,
Log: enterpriseLogger.Child("config-env"),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O

gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
err = gw.Setup(
ctx,
embedded.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, embedded.VersionHandler, rsourcesService,
)
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
return err
}
err = gw.Setup(
ctx,
gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, gatewayApp.VersionHandler, rsourcesService,
)
Expand Down
2 changes: 1 addition & 1 deletion app/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// SuppressUserFeature handles webhook event requests
type SuppressUserFeature interface {
Setup(backendConfig backendconfig.BackendConfig) (types.UserSuppression, error)
Setup(ctx context.Context, backendConfig backendconfig.BackendConfig) (types.UserSuppression, error)
}

/*********************************
Expand Down
19 changes: 9 additions & 10 deletions enterprise/config-env/configEnv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@ import (
"github.com/rudderlabs/rudder-server/utils/logger"
)

type HandleT struct{}
type HandleT struct {
Log logger.Logger
}

var (
configEnvReplacer string
pkgLogger logger.Logger
)
var configEnvReplacer string

func loadConfig() {
configEnvReplacer = config.GetString("BackendConfig.configEnvReplacer", "env.")
}

// ReplaceConfigWithEnvVariables : Replaces all env variables in the config
func (*HandleT) ReplaceConfigWithEnvVariables(workspaceConfig []byte) (updatedConfig []byte) {
func (h *HandleT) ReplaceConfigWithEnvVariables(workspaceConfig []byte) (updatedConfig []byte) {
configMap := make(map[string]interface{}, 0)

err := json.Unmarshal(workspaceConfig, &configMap)
if err != nil {
pkgLogger.Error("[ConfigEnv] Error while parsing request", err, string(workspaceConfig))
h.Log.Error("[ConfigEnv] Error while parsing request", err, string(workspaceConfig))
return workspaceConfig
}

flattenedConfig, err := flatten.Flatten(configMap, "", flatten.DotStyle)
if err != nil {
pkgLogger.Errorf("[ConfigEnv] Failed to flatten workspace config: %v", err)
h.Log.Errorf("[ConfigEnv] Failed to flatten workspace config: %v", err)
return workspaceConfig
}

Expand All @@ -51,12 +50,12 @@ func (*HandleT) ReplaceConfigWithEnvVariables(workspaceConfig []byte) (updatedCo
envVarValue := os.Getenv(envVariable)
if envVarValue == "" {
errorMessage := fmt.Sprintf("[ConfigEnv] Missing envVariable: %s. Either set it as envVariable or remove %s from the destination config.", envVariable, configEnvReplacer)
pkgLogger.Errorf(errorMessage)
h.Log.Errorf(errorMessage)
continue
}
workspaceConfig, err = sjson.SetBytes(workspaceConfig, configKey, envVarValue)
if err != nil {
pkgLogger.Error("[ConfigEnv] Failed to set config for %s", configKey)
h.Log.Error("[ConfigEnv] Failed to set config for %s", configKey)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions enterprise/config-env/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ import (

type Factory struct {
EnterpriseToken string
Log logger.Logger
}

// Setup initializes Suppress User feature
func (m *Factory) Setup() types.ConfigEnvI {
if m.Log == nil {
m.Log = logger.NewLogger().Child("enterprise").Child("config-env")
}
if m.EnterpriseToken == "" {
return &NOOP{}
}

loadConfig()
pkgLogger = logger.NewLogger().Child("enterprise").Child("config-env")
m.Log = logger.NewLogger().Child("enterprise").Child("config-env")

pkgLogger.Info("[[ ConfigEnv ]] Setting up config env handler")
handle := &HandleT{}
m.Log.Info("[[ ConfigEnv ]] Setting up config env handler")
handle := &HandleT{Log: m.Log}

return handle
}
34 changes: 16 additions & 18 deletions enterprise/replay/dumpsloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ import (
"github.com/tidwall/gjson"
)

func init() {
pkgLogger = logger.NewLogger().Child("enterprise").Child("replay").Child("dumpsLoader")
}

// DumpsLoaderHandleT - dumps-loader handle
type dumpsLoaderHandleT struct {
log logger.Logger
dbHandle *jobsdb.HandleT
prefix string
bucket string
Expand Down Expand Up @@ -78,7 +75,7 @@ type OrderedJobs struct {
Job *jobsdb.JobT
}

func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.HandleT) {
func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.HandleT, log logger.Logger) {
// sorting dumps list on index
sort.Slice(objects, func(i, j int) bool {
return objects[i].SortIndex < objects[j].SortIndex
Expand All @@ -89,7 +86,7 @@ func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.Hand
jobs = append(jobs, object.Job)
}

pkgLogger.Info("Total dumps count : ", len(objects))
log.Info("Total dumps count : ", len(objects))
err := dbHandle.Store(ctx, jobs)
if err != nil {
panic(fmt.Errorf("Failed to write dumps locations to DB with error: %w", err))
Expand All @@ -103,7 +100,7 @@ func (gwHandle *GWReplayRequestHandler) fetchDumpsList(ctx context.Context) {
maxItems := config.GetInt64("MAX_ITEMS", 1000) // MAX_ITEMS is the max number of files to be fetched in one iteration from object storage
uploadMaxItems := config.GetInt64("UPLOAD_MAX_ITEMS", 1) // UPLOAD_MAX_ITEMS is the max number of objects to be uploaded to postgres

pkgLogger.Info("Fetching gw dump files list")
gwHandle.handle.log.Info("Fetching gw dump files list")
objects := make([]OrderedJobs, 0)

iter := filemanager.IterateFilesWithPrefix(ctx,
Expand Down Expand Up @@ -131,8 +128,8 @@ func (gwHandle *GWReplayRequestHandler) fetchDumpsList(ctx context.Context) {
if err == nil {
pass = maxJobCreatedAt >= startTimeMilli && minJobCreatedAt <= endTimeMilli
} else {
pkgLogger.Infof("gw dump name(%s) is not of the expected format. Parse failed with error %w", object.Key, err)
pkgLogger.Info("Falling back to comparing start and end time stamps with gw dump last modified.")
gwHandle.handle.log.Infof("gw dump name(%s) is not of the expected format. Parse failed with error %w", object.Key, err)
gwHandle.handle.log.Info("Falling back to comparing start and end time stamps with gw dump last modified.")
pass = object.LastModified.After(gwHandle.handle.startTime) && object.LastModified.Before(gwHandle.handle.endTime)
}

Expand All @@ -148,25 +145,25 @@ func (gwHandle *GWReplayRequestHandler) fetchDumpsList(ctx context.Context) {
}
}
if len(objects) >= int(uploadMaxItems) {
storeJobs(ctx, objects, gwHandle.handle.dbHandle)
storeJobs(ctx, objects, gwHandle.handle.dbHandle, gwHandle.handle.log)
objects = nil
}
}
if iter.Err() != nil {
panic(fmt.Errorf("Failed to iterate gw dump files with error: %w", iter.Err()))
}
if len(objects) != 0 {
storeJobs(ctx, objects, gwHandle.handle.dbHandle)
storeJobs(ctx, objects, gwHandle.handle.dbHandle, gwHandle.handle.log)
objects = nil
}

pkgLogger.Info("Dumps loader job is done")
gwHandle.handle.log.Info("Dumps loader job is done")
gwHandle.handle.done = true
}

func (procHandle *ProcErrorRequestHandler) fetchDumpsList(ctx context.Context) {
objects := make([]OrderedJobs, 0)
pkgLogger.Info("Fetching proc err files list")
procHandle.handle.log.Info("Fetching proc err files list")
var err error
maxItems := config.GetInt64("MAX_ITEMS", 1000) // MAX_ITEMS is the max number of files to be fetched in one iteration from object storage
uploadMaxItems := config.GetInt64("UPLOAD_MAX_ITEMS", 1) // UPLOAD_MAX_ITEMS is the max number of objects to be uploaded to postgres
Expand All @@ -181,7 +178,7 @@ func (procHandle *ProcErrorRequestHandler) fetchDumpsList(ctx context.Context) {
object := iter.Get()
if strings.Contains(object.Key, "rudder-proc-err-logs") {
if object.LastModified.Before(procHandle.handle.startTime) || (object.LastModified.Sub(procHandle.handle.endTime).Hours() > 1) {
pkgLogger.Debugf("Skipping object: %v ObjectLastModifiedTime: %v", object.Key, object.LastModified)
procHandle.handle.log.Debugf("Skipping object: %v ObjectLastModifiedTime: %v", object.Key, object.LastModified)
continue
}
key := object.Key
Expand All @@ -204,7 +201,7 @@ func (procHandle *ProcErrorRequestHandler) fetchDumpsList(ctx context.Context) {
objects = append(objects, OrderedJobs{Job: &job, SortIndex: idx})
}
if len(objects) >= int(uploadMaxItems) {
storeJobs(ctx, objects, procHandle.handle.dbHandle)
storeJobs(ctx, objects, procHandle.handle.dbHandle, procHandle.handle.log)
objects = nil
}

Expand All @@ -213,10 +210,10 @@ func (procHandle *ProcErrorRequestHandler) fetchDumpsList(ctx context.Context) {
panic(fmt.Errorf("Failed to iterate proc err files with error: %w", iter.Err()))
}
if len(objects) != 0 {
storeJobs(ctx, objects, procHandle.handle.dbHandle)
storeJobs(ctx, objects, procHandle.handle.dbHandle, procHandle.handle.log)
}

pkgLogger.Info("Dumps loader job is done")
procHandle.handle.log.Info("Dumps loader job is done")
procHandle.handle.done = true
}

Expand All @@ -226,8 +223,9 @@ func (handle *dumpsLoaderHandleT) handleRecovery() {
}

// Setup sets up dumps-loader.
func (handle *dumpsLoaderHandleT) Setup(ctx context.Context, db *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string) {
func (handle *dumpsLoaderHandleT) Setup(ctx context.Context, db *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
var err error
handle.log = log
handle.dbHandle = db
handle.handleRecovery()

Expand Down
24 changes: 14 additions & 10 deletions enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
)

type Handler struct {
log logger.Logger
bucket string
db *jobsdb.HandleT
toDB *jobsdb.HandleT
Expand All @@ -26,11 +28,11 @@ type Handler struct {
}

func (handle *Handler) generatorLoop(ctx context.Context) {
pkgLogger.Infof("generator reading from replay_jobs_* started")
handle.log.Infof("generator reading from replay_jobs_* started")
var breakLoop bool
select {
case <-ctx.Done():
pkgLogger.Infof("generator reading from replay_jobs_* stopped:Context cancelled")
handle.log.Infof("generator reading from replay_jobs_* stopped:Context cancelled")
return
case <-handle.initSourceWorkersChannel:
}
Expand All @@ -41,7 +43,7 @@ func (handle *Handler) generatorLoop(ctx context.Context) {
}
toRetry, err := handle.db.GetToRetry(context.TODO(), queryParams)
if err != nil {
pkgLogger.Errorf("Error getting to retry jobs: %v", err)
handle.log.Errorf("Error getting to retry jobs: %v", err)
panic(err)
}
combinedList := toRetry.Jobs
Expand All @@ -50,21 +52,21 @@ func (handle *Handler) generatorLoop(ctx context.Context) {
queryParams.JobsLimit -= len(combinedList)
unprocessed, err := handle.db.GetUnprocessed(context.TODO(), queryParams)
if err != nil {
pkgLogger.Errorf("Error getting unprocessed jobs: %v", err)
handle.log.Errorf("Error getting unprocessed jobs: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
}
pkgLogger.Infof("length of combinedList : %d", len(combinedList))
handle.log.Infof("length of combinedList : %d", len(combinedList))

if len(combinedList) == 0 {
if breakLoop {
executingList, err := handle.db.GetExecuting(context.TODO(), jobsdb.GetQueryParamsT{CustomValFilters: []string{"replay"}, JobsLimit: handle.dbReadSize})
if err != nil {
pkgLogger.Errorf("Error getting executing jobs: %v", err)
handle.log.Errorf("Error getting executing jobs: %v", err)
panic(err)
}
pkgLogger.Infof("breakLoop is set. Pending executing: %d", len(executingList.Jobs))
handle.log.Infof("breakLoop is set. Pending executing: %d", len(executingList.Jobs))
if len(executingList.Jobs) == 0 {
break
}
Expand All @@ -74,7 +76,7 @@ func (handle *Handler) generatorLoop(ctx context.Context) {
breakLoop = true
}

pkgLogger.Debugf("DB Read Complete. No Jobs to process")
handle.log.Debugf("DB Read Complete. No Jobs to process")
time.Sleep(5 * time.Second)
continue
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (handle *Handler) generatorLoop(ctx context.Context) {

// Since generator read is done, closing worker channels
for _, worker := range handle.workers {
pkgLogger.Infof("Closing worker channels")
handle.log.Infof("Closing worker channels")
close(worker.channel)
}
}
Expand All @@ -130,6 +132,7 @@ func (handle *Handler) initSourceWorkers(ctx context.Context) {
handle.workers = make([]*SourceWorkerT, handle.noOfWorkers)
for i := 0; i < handle.noOfWorkers; i++ {
worker := &SourceWorkerT{
log: handle.log,
channel: make(chan *jobsdb.JobT, handle.dbReadSize),
workerID: i,
replayHandler: handle,
Expand All @@ -144,7 +147,8 @@ func (handle *Handler) initSourceWorkers(ctx context.Context) {
handle.initSourceWorkersChannel <- true
}

func (handle *Handler) Setup(ctx context.Context, dumpsLoader *dumpsLoaderHandleT, db, toDB *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string) {
func (handle *Handler) Setup(ctx context.Context, dumpsLoader *dumpsLoaderHandleT, db, toDB *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
handle.log = log
handle.db = db
handle.toDB = toDB
handle.bucket = bucket
Expand Down
Loading

0 comments on commit e3d78dd

Please sign in to comment.