Skip to content

Commit

Permalink
fixup! feat: support using badgerDB for suppressions
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 20, 2022
1 parent 59c3991 commit 94b7b37
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 44 deletions.
2 changes: 1 addition & 1 deletion enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.Backend
return nil, err
}

h := newHandler(ctx, repository, m.Log)
h := newHandler(repository, m.Log)

rruntime.Go(func() {
syncer.SyncLoop(ctx)
Expand Down
39 changes: 7 additions & 32 deletions enterprise/suppress-user/handler.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,32 @@
package suppression

import (
"context"
"fmt"
"sync"
"time"
"errors"

"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/samber/lo"
)

// newHandler creates a new handler for the suppression feature
func newHandler(ctx context.Context, r Repository, log logger.Logger) *handler {
func newHandler(r Repository, log logger.Logger) *handler {
h := &handler{
r: r,
log: log,
}
// we don't want to flood logs if, e.g. the suppression repository is restoring,
// so we are debouncing the logging
h.errLog.debounceLog, h.errLog.cancel = lo.NewDebounce(1*time.Second, func() {
h.errLog.errMu.Lock()
defer h.errLog.errMu.Unlock()
if h.errLog.err != nil {
h.log.Warn(h.errLog.err.Error())
}
})
go func() {
<-ctx.Done()
h.errLog.cancel()
}()
return h
}

// handler is a handle to this object
type handler struct {
log logger.Logger
r Repository
errLog struct {
debounceLog func() // logs suppression failures with a debounce, once every 1 second
cancel func() // cancels the debounce timer
errMu sync.Mutex
err error // the last error
}
log logger.Logger
r Repository
}

func (h *handler) IsSuppressedUser(workspaceID, userID, sourceID string) bool {
h.log.Debugf("IsSuppressedUser called for workspace: %s, user %s, source %s", workspaceID, userID, sourceID)
suppressed, err := h.r.Suppressed(workspaceID, userID, sourceID)
if err != nil {
h.errLog.errMu.Lock()
h.errLog.err = fmt.Errorf("suppression check failed for workspace: %s, user: %s, source: %s: %w", workspaceID, userID, sourceID, err)
h.errLog.debounceLog()
h.errLog.errMu.Unlock()
if err != nil && !errors.Is(err, model.ErrRestoring) {
h.log.Errorf("Suppression check failed for workspace: %s, user: %s, source: %s: %w", workspaceID, userID, sourceID, err)
}
return suppressed
}
21 changes: 15 additions & 6 deletions enterprise/suppress-user/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package suppression

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/stretchr/testify/require"
)

func TestDebounceLogConcurrency(t *testing.T) {
ctx := context.Background()
func init() {
rand.Seed(time.Now().UnixNano())
}

func TestIsSuppressedConcurrency(t *testing.T) {
log := &tLog{Logger: logger.NOP}
h := newHandler(ctx, &fakeSuppresser{}, log)
h := newHandler(&fakeSuppresser{}, log)

var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
_ = h.IsSuppressedUser("workspaceID", "userID", "sourceID")
require.False(t, h.IsSuppressedUser("workspaceID", "userID", "sourceID"))
}()
}
wg.Wait()
Expand All @@ -32,7 +36,12 @@ type fakeSuppresser struct {
}

func (*fakeSuppresser) Suppressed(_, _, _ string) (bool, error) {
return false, fmt.Errorf("some error")
// random failures, but always returning false
if rand.Intn(2)%2 == 0 { // skipcq: GSC-G404
return false, fmt.Errorf("some error")
} else {
return false, nil
}
}

type tLog struct {
Expand Down
6 changes: 1 addition & 5 deletions enterprise/suppress-user/suppress_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func generateTests(getRepo func() Repository) {
var h *handler
var serverResponse syncResponse
var server *httptest.Server
var ctx context.Context
var cancel context.CancelFunc
newTestServer := func() *httptest.Server {
var count int
var prevRespBody []byte
Expand All @@ -88,17 +86,15 @@ func generateTests(getRepo func() Repository) {
}

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background())
config.Reset()
backendconfig.Init()
server = newTestServer()

r := getRepo()
h = newHandler(ctx, r, logger.NOP)
h = newHandler(r, logger.NOP)
})
AfterEach(func() {
server.Close()
cancel()
})
Context("sync error scenarios", func() {
It("returns an error when a wrong server address is provided", func() {
Expand Down

0 comments on commit 94b7b37

Please sign in to comment.