Skip to content

Commit

Permalink
using sync.Map instead of mutex + Map for concurrency lock (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
jt-dd authored Jan 24, 2025
1 parent 3fe51d2 commit d9710f9
Showing 1 changed file with 6 additions and 15 deletions.
21 changes: 6 additions & 15 deletions pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type IngestorAPI struct {
Cfg *config.KubehoundConfig
providers *providers.ProvidersFactoryConfig

mu *sync.RWMutex // mutex to sync write to the runIDs map
runIDs map[string]bool // runIDs map to monitor and avoid concurrency processing on the same runID
runIDs sync.Map // runIDs map to monitor and avoid concurrency processing on the same runID
}

var (
Expand All @@ -51,16 +50,12 @@ var (

func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notifier notifier.Notifier,
p *providers.ProvidersFactoryConfig) *IngestorAPI {
var mu sync.RWMutex
var runIDs = make(map[string]bool)

return &IngestorAPI{
notifier: notifier,
puller: puller,
Cfg: cfg,
providers: p,
mu: &mu,
runIDs: runIDs,
runIDs: sync.Map{},
}
}

Expand Down Expand Up @@ -306,24 +301,20 @@ func (g *IngestorAPI) Notify(ctx context.Context, clusterName string, runID stri
// Using a map to monitor all runIDs being processed,
// Using a mutex to write/read data to the runIDs map
func (g *IngestorAPI) lockRunID(runID string) error {
g.mu.Lock()
defer g.mu.Unlock()
entry, ok := g.runIDs[runID]
_, ok := g.runIDs.Load(runID)

// If a runID is being processed, dropping the request
if ok && entry {
if ok {
return fmt.Errorf("%w [runID:%s]", ErrCurrentlyIngesting, runID)
}

// Locking the current runID
g.runIDs[runID] = true
g.runIDs.Store(runID, true)

return nil
}

// Delocking the runID
func (g *IngestorAPI) unlockRunID(runID string) {
g.mu.Lock()
g.runIDs[runID] = false
g.mu.Unlock()
g.runIDs.Delete(runID)
}

0 comments on commit d9710f9

Please sign in to comment.