Skip to content

Commit

Permalink
local-ingest-dump-pipeline (#238)
Browse files Browse the repository at this point in the history
* adding ingest dump auto pipeline

* adding some logs

* fix double import
  • Loading branch information
jt-dd authored Aug 2, 2024
1 parent b7c6305 commit 2a07ce2
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
3 changes: 1 addition & 2 deletions cmd/kubehound/backend.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"github.com/DataDog/KubeHound/pkg/backend"
docker "github.com/DataDog/KubeHound/pkg/backend"
"github.com/spf13/cobra"
)
Expand All @@ -11,7 +10,7 @@ var (
hard bool
composePath []string

uiProfile = backend.DefaultUIProfile
uiProfile = docker.DefaultUIProfile
uiInvana bool
)

Expand Down
29 changes: 28 additions & 1 deletion cmd/kubehound/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

docker "github.com/DataDog/KubeHound/pkg/backend"
"github.com/DataDog/KubeHound/pkg/cmd"
"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/kubehound/core"
Expand All @@ -12,6 +13,11 @@ import (
"github.com/spf13/viper"
)

var (
runLocalIngest bool
startBackend bool
)

var (
dumpCmd = &cobra.Command{
Use: "dump",
Expand Down Expand Up @@ -79,7 +85,25 @@ var (
if err != nil {
return fmt.Errorf("get config: %w", err)
}
_, err = core.DumpCore(cobraCmd.Context(), khCfg, false)
resultPath, err := core.DumpCore(cobraCmd.Context(), khCfg, false)
if err != nil {
return fmt.Errorf("dump core: %w", err)
}

if startBackend {
err = docker.NewBackend(cobraCmd.Context(), composePath, docker.DefaultUIProfile)
if err != nil {
return fmt.Errorf("new backend: %w", err)
}
err = docker.Up(cobraCmd.Context())
if err != nil {
return fmt.Errorf("docker up: %w", err)
}
}

if runLocalIngest {
err = core.CoreLocalIngest(cobraCmd.Context(), khCfg, resultPath)
}

return err
},
Expand All @@ -92,6 +116,9 @@ func init() {
cmd.InitRemoteDumpCmd(dumpRemoteCmd)
cmd.InitRemoteIngestCmd(dumpRemoteCmd, false)

dumpLocalCmd.Flags().BoolVar(&runLocalIngest, "ingest", false, "Run the ingestion after the dump")
dumpLocalCmd.Flags().BoolVar(&startBackend, "backend", false, "Start the backend after the dump")

dumpCmd.AddCommand(dumpRemoteCmd)
dumpCmd.AddCommand(dumpLocalCmd)
rootCmd.AddCommand(dumpCmd)
Expand Down
23 changes: 15 additions & 8 deletions pkg/collector/k8s_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (

// FileCollector implements a collector based on local K8s API json files generated outside the KubeHound application via e.g kubectl.
type k8sAPICollector struct {
clientset kubernetes.Interface
log *log.KubehoundLogger
rl ratelimit.Limiter
cfg *config.K8SAPICollectorConfig
tags collectorTags
waitTime map[string]time.Duration
startTime time.Time
mu *sync.Mutex
clientset kubernetes.Interface
log *log.KubehoundLogger
rl ratelimit.Limiter
cfg *config.K8SAPICollectorConfig
tags collectorTags
waitTime map[string]time.Duration
startTime time.Time
mu *sync.Mutex
isStreaming bool
}

const (
Expand Down Expand Up @@ -138,6 +139,12 @@ func (c *k8sAPICollector) wait(_ context.Context, resourceType string, tags []st
defer c.mu.Unlock()
c.waitTime[resourceType] += waitTime

// Display a message to tell the user the streaming has started (only once after the approval has been made)
if !c.isStreaming {
log.I.Info("Streaming data from the K8s API")
c.isStreaming = true
}

// entity := tag.Entity(resourceType)
err := statsd.Gauge(metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/dump/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func dumpK8sObjs(ctx context.Context, operationName string, entity string, strea
var err error
defer func() { span.Finish(tracer.WithError(err)) }()
err = streamFunc(ctx)
log.I.Infof("Dumping %s done", entity)

return ctx, err
}
2 changes: 2 additions & 0 deletions pkg/kubehound/core/core_grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin
defer conn.Close()
client := pb.NewAPIClient(conn)

log.I.Infof("Launching ingestion on %s [rundID: %s]", ingestorConfig.API.Endpoint, runID)

_, err = client.Ingest(context.Background(), &pb.IngestRequest{
RunId: runID,
ClusterName: clusteName,
Expand Down

0 comments on commit 2a07ce2

Please sign in to comment.