From 4eaa7839e0e6d17eaeb53cc156af7d2f62d3eff2 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 13 Jan 2025 11:13:19 +0800 Subject: [PATCH] update Signed-off-by: Gyuho Lee --- components/accelerator/nvidia/nccl/component.go | 5 +++++ .../accelerator/nvidia/peermem/component.go | 5 +++++ components/cpu/component.go | 5 +++++ components/cpu/component_test.go | 11 +++++++++++ components/fd/component.go | 5 +++++ components/fd/component_test.go | 11 +++++++++++ components/memory/component.go | 5 +++++ components/query/poller.go | 15 +++++++++++++++ 8 files changed, 62 insertions(+) diff --git a/components/accelerator/nvidia/nccl/component.go b/components/accelerator/nvidia/nccl/component.go index 44cbad7b..d72399e2 100644 --- a/components/accelerator/nvidia/nccl/component.go +++ b/components/accelerator/nvidia/nccl/component.go @@ -75,6 +75,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E if common_dmesg.GetDefaultLogPoller() == nil { return nil, nil } + select { + case <-common_dmesg.GetDefaultLogPoller().WaitStart(): + case <-ctx.Done(): + return nil, ctx.Err() + } logItems, err := common_dmesg.GetDefaultLogPoller().Find(since) if err != nil { diff --git a/components/accelerator/nvidia/peermem/component.go b/components/accelerator/nvidia/peermem/component.go index 82eae8e8..4fabaca6 100644 --- a/components/accelerator/nvidia/peermem/component.go +++ b/components/accelerator/nvidia/peermem/component.go @@ -119,6 +119,11 @@ func (c *component) getEvents(ctx context.Context, since time.Time) ([]component if common_dmesg.GetDefaultLogPoller() == nil { return nil, nil } + select { + case <-common_dmesg.GetDefaultLogPoller().WaitStart(): + case <-ctx.Done(): + return nil, ctx.Err() + } logItems, err := common_dmesg.GetDefaultLogPoller().Find(since) if err != nil { diff --git a/components/cpu/component.go b/components/cpu/component.go index 95d3ae4c..c6974b5e 100644 --- a/components/cpu/component.go +++ b/components/cpu/component.go @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E if common_dmesg.GetDefaultLogPoller() == nil { return nil, nil } + select { + case <-common_dmesg.GetDefaultLogPoller().WaitStart(): + case <-ctx.Done(): + return nil, ctx.Err() + } logItems, err := common_dmesg.GetDefaultLogPoller().Find(since) if err != nil { diff --git a/components/cpu/component_test.go b/components/cpu/component_test.go index 51d95c00..9060c17f 100644 --- a/components/cpu/component_test.go +++ b/components/cpu/component_test.go @@ -6,6 +6,7 @@ import ( "time" query_config "github.com/leptonai/gpud/components/query/config" + "github.com/leptonai/gpud/pkg/sqlite" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -16,11 +17,21 @@ func TestComponent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, err := sqlite.Open(":memory:") + if err != nil { + t.Fatalf("failed to open db: %v", err) + } + defer db.Close() + component := New( ctx, Config{ Query: query_config.Config{ Interval: metav1.Duration{Duration: time.Second}, + State: &query_config.State{ + DBRO: db, + DBRW: db, + }, }, }, ) diff --git a/components/fd/component.go b/components/fd/component.go index e24ef8bb..199f23a2 100644 --- a/components/fd/component.go +++ b/components/fd/component.go @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E if common_dmesg.GetDefaultLogPoller() == nil { return nil, nil } + select { + case <-common_dmesg.GetDefaultLogPoller().WaitStart(): + case <-ctx.Done(): + return nil, ctx.Err() + } logItems, err := common_dmesg.GetDefaultLogPoller().Find(since) if err != nil { diff --git a/components/fd/component_test.go b/components/fd/component_test.go index dee6066b..1bc66b68 100644 --- a/components/fd/component_test.go +++ b/components/fd/component_test.go @@ -6,6 +6,7 @@ import ( "time" query_config "github.com/leptonai/gpud/components/query/config" + "github.com/leptonai/gpud/pkg/sqlite" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -16,11 +17,21 @@ func TestComponent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, err := sqlite.Open(":memory:") + if err != nil { + t.Fatalf("failed to open db: %v", err) + } + defer db.Close() + component := New( ctx, Config{ Query: query_config.Config{ Interval: metav1.Duration{Duration: 5 * time.Second}, + State: &query_config.State{ + DBRO: db, + DBRW: db, + }, }, }, ) diff --git a/components/memory/component.go b/components/memory/component.go index 32ffdcbd..8477ae3b 100644 --- a/components/memory/component.go +++ b/components/memory/component.go @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E if common_dmesg.GetDefaultLogPoller() == nil { return nil, nil } + select { + case <-common_dmesg.GetDefaultLogPoller().WaitStart(): + case <-ctx.Done(): + return nil, ctx.Err() + } logItems, err := common_dmesg.GetDefaultLogPoller().Find(since) if err != nil { diff --git a/components/query/poller.go b/components/query/poller.go index eb0c9f8d..575fc9ee 100644 --- a/components/query/poller.go +++ b/components/query/poller.go @@ -27,6 +27,8 @@ type Poller interface { // Redundant calls will be skipped if there's an existing poller. Start(ctx context.Context, cfg query_config.Config, componentName string) + WaitStart() <-chan any + // Config returns the config used to start the poller. // This is useful for debugging and logging. Config() query_config.Config @@ -84,6 +86,8 @@ func New(id string, cfg query_config.Config, getFunc GetFunc, getErrHandler GetE startPollFunc: startPoll, getFunc: getFunc, getErrHandler: getErrHandler, + startedCloseOnce: sync.Once{}, + startedCh: make(chan any), cfg: cfg, inflightComponents: make(map[string]any), } @@ -102,6 +106,9 @@ type poller struct { ctx context.Context cancel context.CancelFunc + startedCloseOnce sync.Once + startedCh chan any + cfgMu sync.RWMutex cfg query_config.Config @@ -213,9 +220,17 @@ func (pl *poller) Start(ctx context.Context, cfg query_config.Config, componentN } }() + pl.startedCloseOnce.Do(func() { + close(pl.startedCh) + }) + log.Logger.Debugw("started poller", "caller", componentName, "inflightComponents", len(pl.inflightComponents)) } +func (pl *poller) WaitStart() <-chan any { + return pl.startedCh +} + func (pl *poller) Stop(componentName string) bool { pl.ctxMu.Lock() defer pl.ctxMu.Unlock()