Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 13, 2025
1 parent 1bd689e commit 4eaa783
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 0 deletions.
5 changes: 5 additions & 0 deletions components/accelerator/nvidia/nccl/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
if common_dmesg.GetDefaultLogPoller() == nil {
return nil, nil
}
select {
case <-common_dmesg.GetDefaultLogPoller().WaitStart():
case <-ctx.Done():
return nil, ctx.Err()
}

logItems, err := common_dmesg.GetDefaultLogPoller().Find(since)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions components/accelerator/nvidia/peermem/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (c *component) getEvents(ctx context.Context, since time.Time) ([]component
if common_dmesg.GetDefaultLogPoller() == nil {
return nil, nil
}
select {
case <-common_dmesg.GetDefaultLogPoller().WaitStart():
case <-ctx.Done():
return nil, ctx.Err()
}

logItems, err := common_dmesg.GetDefaultLogPoller().Find(since)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions components/cpu/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
if common_dmesg.GetDefaultLogPoller() == nil {
return nil, nil
}
select {
case <-common_dmesg.GetDefaultLogPoller().WaitStart():
case <-ctx.Done():
return nil, ctx.Err()
}

logItems, err := common_dmesg.GetDefaultLogPoller().Find(since)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions components/cpu/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

query_config "github.com/leptonai/gpud/components/query/config"
"github.com/leptonai/gpud/pkg/sqlite"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -16,11 +17,21 @@ func TestComponent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := sqlite.Open(":memory:")
if err != nil {
t.Fatalf("failed to open db: %v", err)
}
defer db.Close()

component := New(
ctx,
Config{
Query: query_config.Config{
Interval: metav1.Duration{Duration: time.Second},
State: &query_config.State{
DBRO: db,
DBRW: db,
},
},
},
)
Expand Down
5 changes: 5 additions & 0 deletions components/fd/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
if common_dmesg.GetDefaultLogPoller() == nil {
return nil, nil
}
select {
case <-common_dmesg.GetDefaultLogPoller().WaitStart():
case <-ctx.Done():
return nil, ctx.Err()
}

logItems, err := common_dmesg.GetDefaultLogPoller().Find(since)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions components/fd/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

query_config "github.com/leptonai/gpud/components/query/config"
"github.com/leptonai/gpud/pkg/sqlite"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -16,11 +17,21 @@ func TestComponent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := sqlite.Open(":memory:")
if err != nil {
t.Fatalf("failed to open db: %v", err)
}
defer db.Close()

component := New(
ctx,
Config{
Query: query_config.Config{
Interval: metav1.Duration{Duration: 5 * time.Second},
State: &query_config.State{
DBRO: db,
DBRW: db,
},
},
},
)
Expand Down
5 changes: 5 additions & 0 deletions components/memory/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
if common_dmesg.GetDefaultLogPoller() == nil {
return nil, nil
}
select {
case <-common_dmesg.GetDefaultLogPoller().WaitStart():
case <-ctx.Done():
return nil, ctx.Err()
}

logItems, err := common_dmesg.GetDefaultLogPoller().Find(since)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions components/query/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Poller interface {
// Redundant calls will be skipped if there's an existing poller.
Start(ctx context.Context, cfg query_config.Config, componentName string)

WaitStart() <-chan any

// Config returns the config used to start the poller.
// This is useful for debugging and logging.
Config() query_config.Config
Expand Down Expand Up @@ -84,6 +86,8 @@ func New(id string, cfg query_config.Config, getFunc GetFunc, getErrHandler GetE
startPollFunc: startPoll,
getFunc: getFunc,
getErrHandler: getErrHandler,
startedCloseOnce: sync.Once{},
startedCh: make(chan any),
cfg: cfg,
inflightComponents: make(map[string]any),
}
Expand All @@ -102,6 +106,9 @@ type poller struct {
ctx context.Context
cancel context.CancelFunc

startedCloseOnce sync.Once
startedCh chan any

cfgMu sync.RWMutex
cfg query_config.Config

Expand Down Expand Up @@ -213,9 +220,17 @@ func (pl *poller) Start(ctx context.Context, cfg query_config.Config, componentN
}
}()

pl.startedCloseOnce.Do(func() {
close(pl.startedCh)
})

log.Logger.Debugw("started poller", "caller", componentName, "inflightComponents", len(pl.inflightComponents))
}

func (pl *poller) WaitStart() <-chan any {
return pl.startedCh
}

func (pl *poller) Stop(componentName string) bool {
pl.ctxMu.Lock()
defer pl.ctxMu.Unlock()
Expand Down

0 comments on commit 4eaa783

Please sign in to comment.