Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dmesg, components): deprecate dmesg component, use it as a shared log poller, replace xid-sxid with existing separate components #290

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
136 changes: 0 additions & 136 deletions components/accelerator/nvidia/error-xid-sxid/component.go

This file was deleted.

4 changes: 0 additions & 4 deletions components/accelerator/nvidia/error-xid-sxid/id/id.go

This file was deleted.

123 changes: 77 additions & 46 deletions components/accelerator/nvidia/error/sxid/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,47 @@ package sxid
import (
"context"
"fmt"
"runtime"
"strconv"
"time"

"github.com/leptonai/gpud/components"
nvidia_component_error_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid/id"
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
"github.com/leptonai/gpud/components/dmesg"
nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state"
common_dmesg "github.com/leptonai/gpud/components/common/dmesg"
"github.com/leptonai/gpud/log"

"github.com/dustin/go-humanize"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New() components.Component {
return &component{}
func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)

if runtime.GOOS == "linux" {
if perr := common_dmesg.SetDefaultLogPoller(ctx, cfg.Query.State.DBRW, cfg.Query.State.DBRO); perr != nil {
log.Logger.Warnw("failed to set default log poller", "error", perr)
} else {
common_dmesg.GetDefaultLogPoller().Start(cctx, cfg.Query, common_dmesg.Name)
}
}

return &component{
rootCtx: ctx,
cancel: ccancel,
cfg: cfg,
}
}

var _ components.Component = (*component)(nil)

type component struct{}
type component struct {
rootCtx context.Context
cancel context.CancelFunc
cfg Config
}

func (c *component) Name() string { return nvidia_component_error_sxid_id.Name }

Expand All @@ -32,56 +57,56 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
}}, nil
}

// tailScan fetches the latest output from the dmesg
// it is ok to call this function multiple times for the following reasons (thus shared with events method)
// 1) dmesg "TailScan" is cheap (just tails the last x number of lines)
func (c *component) tailScan() (*Output, error) {
dmesgC, err := components.GetComponent(dmesg.Name)
if err != nil {
return nil, err
}
const (
EventNameErroSXid = "error_sxid"

var dmesgComponent *dmesg.Component
if o, ok := dmesgC.(interface{ Unwrap() interface{} }); ok {
if unwrapped, ok := o.Unwrap().(*dmesg.Component); ok {
dmesgComponent = unwrapped
} else {
return nil, fmt.Errorf("expected *dmesg.Component, got %T", dmesgC)
}
EventKeyErroSXidUnixSeconds = "unix_seconds"
EventKeyErroSXidData = "data"
EventKeyErroSXidEncoding = "encoding"
EventValueErroSXidEncodingJSON = "json"
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
if runtime.GOOS != "linux" {
return nil, nil
}
dmesgTailResults, err := dmesgComponent.TailScan()

events, err := nvidia_xid_sxid_state.ReadEvents(ctx, c.cfg.Query.State.DBRO, nvidia_xid_sxid_state.WithSince(since))
if err != nil {
return nil, err
}

o := &Output{}
for _, logItem := range dmesgTailResults.TailScanMatched {
if logItem.Error != nil {
continue
}
if logItem.Matched == nil {
continue
}
if logItem.Matched.Name != dmesg.EventNvidiaNVSwitchSXid {
continue
}

ev, err := nvidia_query_sxid.ParseDmesgLogLine(logItem.Time, logItem.Line)
if err != nil {
return nil, err
}
o.DmesgErrors = append(o.DmesgErrors, ev)
if len(events) == 0 {
log.Logger.Debugw("no event found", "component", c.Name(), "since", humanize.Time(since))
return nil, nil
}

return o, nil
}

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
o, err := c.tailScan()
if err != nil {
return nil, err
log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events))
convertedEvents := make([]components.Event, 0, len(events))
for _, event := range events {
if sxidDetail := event.ToSXidDetail(); sxidDetail != nil {
msg := fmt.Sprintf("sxid %d detected by %s (%s)",
event.EventID,
event.DataSource,
humanize.Time(time.Unix(event.UnixSeconds, 0)),
)
sxidBytes, _ := sxidDetail.JSON()

convertedEvents = append(convertedEvents, components.Event{
Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0).UTC()},
Name: EventNameErroSXid,
Type: components.EventTypeCritical,
Message: msg,
ExtraInfo: map[string]string{
EventKeyErroSXidUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10),
EventKeyErroSXidData: string(sxidBytes),
EventKeyErroSXidEncoding: EventValueErroSXidEncodingJSON,
},
})
continue
}
}
return o.getEvents(since), nil
return convertedEvents, nil
}

func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) {
Expand All @@ -92,5 +117,11 @@ func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.

func (c *component) Close() error {
log.Logger.Debugw("closing component")

if runtime.GOOS == "linux" {
_ = common_dmesg.GetDefaultLogPoller().Stop(common_dmesg.Name)
}

c.cancel()
return nil
}
47 changes: 0 additions & 47 deletions components/accelerator/nvidia/error/sxid/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"time"

"github.com/leptonai/gpud/components"
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
"github.com/leptonai/gpud/log"

"github.com/dustin/go-humanize"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -115,47 +112,3 @@ func (o *Output) GetReason() Reason {
}
return reason
}

const (
EventNameErroSXid = "error_sxid"

EventKeyErroSXidUnixSeconds = "unix_seconds"
EventKeyErroSXidData = "data"
EventKeyErroSXidEncoding = "encoding"
EventValueErroSXidEncodingJSON = "json"
)

func (o *Output) getEvents(since time.Time) []components.Event {
reason := o.GetReason()

des := make([]components.Event, 0)
for i, sxidErr := range reason.Errors {
if sxidErr.Time.IsZero() {
log.Logger.Debugw("skipping event because it's too old", "sxid", sxidErr.SXid, "since", since, "event_time", sxidErr.Time.Time)
continue
}
if sxidErr.Time.Time.Before(since) {
log.Logger.Debugw("skipping event because it's too old", "sxid", sxidErr.SXid, "since", since, "event_time", sxidErr.Time.Time)
continue
}

msg := reason.Messages[i]
sxidErrBytes, _ := sxidErr.JSON()

des = append(des, components.Event{
Time: sxidErr.Time,
Name: EventNameErroSXid,
Type: components.EventTypeCritical,
Message: msg,
ExtraInfo: map[string]string{
EventKeyErroSXidUnixSeconds: strconv.FormatInt(sxidErr.Time.Unix(), 10),
EventKeyErroSXidData: string(sxidErrBytes),
EventKeyErroSXidEncoding: StateValueErrorSXidEncodingJSON,
},
})
}
if len(des) == 0 {
return nil
}
return des
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package errorxidsxid
package sxid

import (
"database/sql"
Expand Down
Loading
Loading