Skip to content

Commit

Permalink
fixing concurrency processing of runID (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
jt-dd authored Jan 24, 2025
1 parent a0dc22f commit 3fe51d2
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"slices"
"strings"
"sync"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
Expand Down Expand Up @@ -37,20 +38,29 @@ type IngestorAPI struct {
notifier notifier.Notifier
Cfg *config.KubehoundConfig
providers *providers.ProvidersFactoryConfig

mu *sync.RWMutex // mutex to sync write to the runIDs map
runIDs map[string]bool // runIDs map to monitor and avoid concurrency processing on the same runID
}

var (
_ API = (*IngestorAPI)(nil)
ErrAlreadyIngested = errors.New("ingestion already completed")
_ API = (*IngestorAPI)(nil)
ErrAlreadyIngested = errors.New("ingestion already completed")
ErrCurrentlyIngesting = errors.New("runID currently being processed skipping this request")
)

func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notifier notifier.Notifier,
p *providers.ProvidersFactoryConfig) *IngestorAPI {
var mu sync.RWMutex
var runIDs = make(map[string]bool)

return &IngestorAPI{
notifier: notifier,
puller: puller,
Cfg: cfg,
providers: p,
mu: &mu,
runIDs: runIDs,
}
}

Expand Down Expand Up @@ -137,6 +147,13 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { //nolint:
clusterName := md.ClusterName
runID := md.RunID

err = g.lockRunID(runID)
if err != nil {
return err
}

defer g.unlockRunID(runID)

err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID))
if err != nil {
return err
Expand Down Expand Up @@ -285,3 +302,28 @@ func (g *IngestorAPI) isAlreadyIngestedInDB(ctx context.Context, clusterName str
func (g *IngestorAPI) Notify(ctx context.Context, clusterName string, runID string) error {
return g.notifier.Notify(ctx, clusterName, runID)
}

// Using a map to monitor all runIDs being processed,
// Using a mutex to write/read data to the runIDs map
func (g *IngestorAPI) lockRunID(runID string) error {
g.mu.Lock()
defer g.mu.Unlock()
entry, ok := g.runIDs[runID]

// If a runID is being processed, dropping the request
if ok && entry {
return fmt.Errorf("%w [runID:%s]", ErrCurrentlyIngesting, runID)
}

// Locking the current runID
g.runIDs[runID] = true

return nil
}

// Delocking the runID
func (g *IngestorAPI) unlockRunID(runID string) {
g.mu.Lock()
g.runIDs[runID] = false
g.mu.Unlock()
}

0 comments on commit 3fe51d2

Please sign in to comment.