diff --git a/components/dmesg/filters.go b/components/dmesg/filters.go index 902794e9..dcc256d5 100644 --- a/components/dmesg/filters.go +++ b/components/dmesg/filters.go @@ -8,8 +8,7 @@ import ( ) func DefaultLogFilters(ctx context.Context) ([]*query_log_common.Filter, error) { - defaultFilters := DefaultDmesgFiltersForMemory() - defaultFilters = append(defaultFilters, DefaultDmesgFiltersForCPU()...) + defaultFilters := DefaultDmesgFiltersForCPU() defaultFilters = append(defaultFilters, DefaultDmesgFiltersForFileDescriptor()...) nvidiaInstalled, err := nvidia_query.GPUsInstalled(ctx) diff --git a/components/dmesg/filters_memory.go b/components/dmesg/filters_memory.go deleted file mode 100644 index b2e95d38..00000000 --- a/components/dmesg/filters_memory.go +++ /dev/null @@ -1,70 +0,0 @@ -package dmesg - -import ( - memory_dmesg "github.com/leptonai/gpud/components/memory/dmesg" - memory_id "github.com/leptonai/gpud/components/memory/id" - query_log_common "github.com/leptonai/gpud/components/query/log/common" - - "k8s.io/utils/ptr" -) - -const ( - // e.g., - // Out of memory: Killed process 123, UID 48, (httpd). - // - // NOTE: this is often followed by a line like: - // [Sun Dec 8 09:23:39 2024] oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0 - // (to reap the memory used by the OOM victim) - EventMemoryOOM = "memory_oom" - - // e.g., - // oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null), - EventMemoryOOMKillConstraint = "memory_oom_kill_constraint" - - // e.g., - // postgres invoked oom-killer: gfp_mask=0x201d2, order=0, oomkilladj=0 - EventMemoryOOMKiller = "memory_oom_killer" - - // e.g., - // Memory cgroup out of memory: Killed process 123, UID 48, (httpd). - EventMemoryOOMCgroup = "memory_oom_cgroup" - - // e.g., - // [...] EDAC MC0: 1 CE memory read error - // [...] EDAC MC1: 128 CE memory read error on CPU_SrcID#1_Ha#0_Chan#1_DIMM#1 - // - // ref. - // https://serverfault.com/questions/682909/how-to-find-faulty-memory-module-from-mce-message - // https://github.com/Azure/azurehpc/blob/2d57191cb35ed638525ba9424cc2aa1b5abe1c05/experimental/aks_npd_draino/npd/deployment/node-problem-detector-config.yaml#L51C20-L51C40 - EventMemoryEDACCorrectableErrors = "memory_edac_correctable_errors" -) - -func DefaultDmesgFiltersForMemory() []*query_log_common.Filter { - return []*query_log_common.Filter{ - { - Name: EventMemoryOOM, - Regex: ptr.To(memory_dmesg.RegexOOM), - OwnerReferences: []string{memory_id.Name}, - }, - { - Name: EventMemoryOOMKillConstraint, - Regex: ptr.To(memory_dmesg.RegexOOMKillConstraint), - OwnerReferences: []string{memory_id.Name}, - }, - { - Name: EventMemoryOOMKiller, - Regex: ptr.To(memory_dmesg.RegexOOMKiller), - OwnerReferences: []string{memory_id.Name}, - }, - { - Name: EventMemoryOOMCgroup, - Regex: ptr.To(memory_dmesg.RegexOOMCgroup), - OwnerReferences: []string{memory_id.Name}, - }, - { - Name: EventMemoryEDACCorrectableErrors, - Regex: ptr.To(memory_dmesg.RegexEDACCorrectableErrors), - OwnerReferences: []string{memory_id.Name}, - }, - } -} diff --git a/components/memory/component.go b/components/memory/component.go index 85e56577..9a6eae57 100644 --- a/components/memory/component.go +++ b/components/memory/component.go @@ -5,42 +5,61 @@ import ( "context" "database/sql" "fmt" - "strconv" "time" "github.com/leptonai/gpud/components" - "github.com/leptonai/gpud/components/common" - "github.com/leptonai/gpud/components/dmesg" + events_db "github.com/leptonai/gpud/components/db" memory_id "github.com/leptonai/gpud/components/memory/id" "github.com/leptonai/gpud/components/memory/metrics" "github.com/leptonai/gpud/components/query" - query_log "github.com/leptonai/gpud/components/query/log" "github.com/leptonai/gpud/log" "github.com/prometheus/client_golang/prometheus" ) -func New(ctx context.Context, cfg Config) components.Component { +func New(ctx context.Context, cfg Config) (components.Component, error) { + eventsStore, err := events_db.NewStore( + cfg.Query.State.DBRW, + cfg.Query.State.DBRO, + events_db.CreateDefaultTableName(memory_id.Name), + 3*24*time.Hour, + ) + if err != nil { + return nil, err + } + cfg.Query.SetDefaultsIfNotSet() setDefaultPoller(cfg) cctx, ccancel := context.WithCancel(ctx) getDefaultPoller().Start(cctx, cfg.Query, memory_id.Name) - return &component{ - rootCtx: ctx, - cancel: ccancel, - poller: getDefaultPoller(), + w, err := newWatcher(cctx, eventsStore) + if err != nil { + ccancel() + return nil, err } + + return &component{ + ctx: cctx, + cancel: ccancel, + poller: getDefaultPoller(), + cfg: cfg, + watcher: w, + eventsStore: eventsStore, + }, nil } var _ components.Component = (*component)(nil) type component struct { - rootCtx context.Context - cancel context.CancelFunc - poller query.Poller - gatherer prometheus.Gatherer + ctx context.Context + cancel context.CancelFunc + poller query.Poller + cfg Config + watcher *watcher + eventsStore events_db.Store + gatherer prometheus.Gatherer } func (c *component) Name() string { return memory_id.Name } @@ -87,67 +106,8 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { return output.States() } -const ( - EventKeyUnixSeconds = "unix_seconds" - EventKeyLogLine = "log_line" -) - func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { - dmesgC, err := components.GetComponent(dmesg.Name) - if err != nil { - return nil, err - } - - var dmesgComponent *dmesg.Component - if o, ok := dmesgC.(interface{ Unwrap() interface{} }); ok { - if unwrapped, ok := o.Unwrap().(*dmesg.Component); ok { - dmesgComponent = unwrapped - } else { - return nil, fmt.Errorf("expected *dmesg.Component, got %T", dmesgC) - } - } - dmesgEvents, err := dmesgComponent.Events(ctx, since) - if err != nil { - return nil, err - } - - events := make([]components.Event, 0) - for _, ev := range dmesgEvents { - v, ok := ev.ExtraInfo[dmesg.EventKeyDmesgMatchedLogItem] - if !ok { - continue - } - item, err := query_log.ParseItemJSON([]byte(v)) - if err != nil || item.Matched == nil { - log.Logger.Errorw("failed to parse log item or none matched", "error", err) - continue - } - - name := "" - included := false - for _, owner := range item.Matched.OwnerReferences { - if owner != memory_id.Name { - continue - } - name = item.Matched.Name - included = true - } - if !included { - continue - } - - events = append(events, components.Event{ - Time: ev.Time, - Name: name, - Type: common.EventTypeWarning, - ExtraInfo: map[string]string{ - EventKeyUnixSeconds: strconv.FormatInt(ev.Time.Unix(), 10), - EventKeyLogLine: item.Line, - }, - }) - } - - return events, nil + return c.eventsStore.Get(ctx, since) } func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) { @@ -182,10 +142,14 @@ func (c *component) Metrics(ctx context.Context, since time.Time) ([]components. func (c *component) Close() error { log.Logger.Debugw("closing component") + c.cancel() // safe to call stop multiple times c.poller.Stop(memory_id.Name) + c.watcher.close() + c.eventsStore.Close() + return nil } diff --git a/components/memory/dmesg/dmesg.go b/components/memory/dmesg/dmesg.go index 8816e225..17e01a33 100644 --- a/components/memory/dmesg/dmesg.go +++ b/components/memory/dmesg/dmesg.go @@ -12,21 +12,25 @@ const ( // NOTE: this is often followed by a line like: // [Sun Dec 8 09:23:39 2024] oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0 // (to reap the memory used by the OOM victim) + EventOOM = "memory_oom" RegexOOM = `Out of memory:` // e.g., // oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null), // [...] oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null), + EventOOMKillConstraint = "memory_oom_kill_constraint" RegexOOMKillConstraint = `oom-kill:constraint=` // e.g., // postgres invoked oom-killer: gfp_mask=0x201d2, order=0, oomkilladj=0 // [...] postgres invoked oom-killer: gfp_mask=0x201d2, order=0, oomkilladj=0 + EventOOMKiller = "memory_oom_killer" RegexOOMKiller = `(?i)\b(invoked|triggered) oom-killer\b` // e.g., // Memory cgroup out of memory: Killed process 123, UID 48, (httpd). // [...] Memory cgroup out of memory: Killed process 123, UID 48, (httpd). + EventOOMCgroup = "memory_oom_cgroup" RegexOOMCgroup = `Memory cgroup out of memory` // May indicate that Dual Inline Memory Module (DIMM) is beginning to fail. @@ -38,6 +42,7 @@ const ( // ref. // https://serverfault.com/questions/682909/how-to-find-faulty-memory-module-from-mce-message // https://github.com/Azure/azurehpc/blob/2d57191cb35ed638525ba9424cc2aa1b5abe1c05/experimental/aks_npd_draino/npd/deployment/node-problem-detector-config.yaml#L51C20-L51C40 + EventEDACCorrectableErrors = "memory_edac_correctable_errors" RegexEDACCorrectableErrors = `.*CE memory read error.*` ) @@ -85,3 +90,28 @@ func HasEDACCorrectableErrors(line string) bool { } return false } + +func Match(line string) (name string, message string) { + for _, m := range getMatches() { + if m.check(line) { + return m.name, m.message + } + } + return "", "" +} + +type match struct { + check func(string) bool + name string + message string +} + +func getMatches() []match { + return []match{ + {check: HasOOM, name: EventOOM, message: "oom detected"}, + {check: HasOOMKillConstraint, name: EventOOMKillConstraint, message: "oom kill constraint detected"}, + {check: HasOOMKiller, name: EventOOMKiller, message: "oom killer detected"}, + {check: HasOOMCgroup, name: EventOOMCgroup, message: "oom cgroup detected"}, + {check: HasEDACCorrectableErrors, name: EventEDACCorrectableErrors, message: "edac correctable errors detected"}, + } +} diff --git a/components/memory/dmesg/dmesg_test.go b/components/memory/dmesg/dmesg_test.go index 28248d5d..e5a08d28 100644 --- a/components/memory/dmesg/dmesg_test.go +++ b/components/memory/dmesg/dmesg_test.go @@ -2,6 +2,8 @@ package dmesg import ( "testing" + + "github.com/stretchr/testify/assert" ) func TestHasOOM(t *testing.T) { @@ -223,3 +225,81 @@ func TestHasEDACCorrectableErrors(t *testing.T) { }) } } + +func TestMatch(t *testing.T) { + tests := []struct { + name string + input string + expectedName string + expectedMsg string + }{ + { + name: "OOM basic case", + input: "Out of memory: Killed process 123, UID 48, (httpd).", + expectedName: EventOOM, + expectedMsg: "oom detected", + }, + { + name: "OOM with timestamp", + input: "[Sun Dec 8 09:23:39 2024] Out of memory: Killed process 123, UID 48, (httpd).", + expectedName: EventOOM, + expectedMsg: "oom detected", + }, + { + name: "OOM kill constraint", + input: "oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null),", + expectedName: EventOOMKillConstraint, + expectedMsg: "oom kill constraint detected", + }, + { + name: "OOM killer invoked", + input: "postgres invoked oom-killer: gfp_mask=0x201d2, order=0, oomkilladj=0", + expectedName: EventOOMKiller, + expectedMsg: "oom killer detected", + }, + { + name: "OOM killer triggered", + input: "process triggered oom-killer: gfp_mask=0x201d2", + expectedName: EventOOMKiller, + expectedMsg: "oom killer detected", + }, + { + name: "OOM cgroup", + input: "Memory cgroup out of memory: Killed process 123, UID 48, (httpd).", + expectedName: EventOOMCgroup, + expectedMsg: "oom cgroup detected", + }, + { + name: "EDAC correctable error", + input: "EDAC MC0: 1 CE memory read error", + expectedName: EventEDACCorrectableErrors, + expectedMsg: "edac correctable errors detected", + }, + { + name: "EDAC correctable error with DIMM info", + input: "EDAC MC1: 128 CE memory read error on CPU_SrcID#1_Ha#0_Chan#1_DIMM#1", + expectedName: EventEDACCorrectableErrors, + expectedMsg: "edac correctable errors detected", + }, + { + name: "non-matching line", + input: "some random log line that doesn't match any patterns", + expectedName: "", + expectedMsg: "", + }, + { + name: "empty line", + input: "", + expectedName: "", + expectedMsg: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + name, msg := Match(tt.input) + assert.Equal(t, tt.expectedName, name) + assert.Equal(t, tt.expectedMsg, msg) + }) + } +} diff --git a/components/memory/testdata/dmesg.decode.iso.log b/components/memory/testdata/dmesg.decode.iso.log new file mode 100644 index 00000000..512cbd8e --- /dev/null +++ b/components/memory/testdata/dmesg.decode.iso.log @@ -0,0 +1,24 @@ +kern :warn : 2025-01-21T04:41:44,283302+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Global Exception on (GPC 8, TPC 5, SM 1): Multiple Warp Errors +kern :warn : 2025-01-21T04:41:44,283390+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x546fb0=0x11f000e 0x546fb4=0x24 0x546fa8=0xf81eb60 0x546fac=0x1174 +kern :warn : 2025-01-21T04:41:44,283575+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Warp Exception on (GPC 9, TPC 1, SM 1): Out Of Range Address +kern :warn : 2025-01-21T04:41:44,283671+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Global Exception on (GPC 9, TPC 1, SM 1): Multiple Warp Errors +kern :warn : 2025-01-21T04:41:44,283758+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x54cfb0=0x11f000e 0x54cfb4=0x24 0x54cfa8=0xf81eb60 0x54cfac=0x1174 +kern :warn : 2025-01-21T04:41:44,283937+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Warp Exception on (GPC 9, TPC 2, SM 0): Out Of Range Address +kern :warn : 2025-01-21T04:41:44,284042+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x54d730=0x11c000e 0x54d734=0x20 0x54d728=0xf81eb60 0x54d72c=0x1174 +kern :warn : 2025-01-21T04:41:44,284250+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Warp Exception on (GPC 9, TPC 5, SM 0): Out Of Range Address +kern :warn : 2025-01-21T04:41:44,284345+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Global Exception on (GPC 9, TPC 5, SM 0): Multiple Warp Errors +kern :warn : 2025-01-21T04:41:44,284433+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x54ef30=0x117000e 0x54ef34=0x24 0x54ef28=0xf81eb60 0x54ef2c=0x1174 +kern :warn : 2025-01-21T04:41:44,284616+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Warp Exception on (GPC 10, TPC 1, SM 1): Out Of Range Address +kern :warn : 2025-01-21T04:41:44,284705+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Global Exception on (GPC 10, TPC 1, SM 1): Multiple Warp Errors +kern :warn : 2025-01-21T04:41:44,284792+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x554fb0=0x119000e 0x554fb4=0x24 0x554fa8=0xf81eb60 0x554fac=0x1174 +kern :warn : 2025-01-21T04:41:44,284971+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Warp Exception on (GPC 10, TPC 4, SM 0): Out Of Range Address +kern :warn : 2025-01-21T04:41:44,285060+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics SM Global Exception on (GPC 10, TPC 4, SM 0): Multiple Warp Errors +kern :warn : 2025-01-21T04:41:44,285147+00:00 NVRM: Xid (PCI:0000:38:00): 13, pid='', name=, Graphics Exception: ESR 0x556730=0x117000e 0x556734=0x24 0x556728=0xf81eb60 0x55672c=0x1174 +kern :warn : 2025-01-21T04:41:44,287197+00:00 NVRM: Xid (PCI:0000:38:00): 43, pid=2924364, name=pt_main_thread, Ch 00000008 +kern :warn : 2025-01-21T08:41:30,287197+00:00 oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0 +kern :warn : 2025-01-21T08:42:30,287197+00:00 oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null), +kern :warn : 2025-01-21T08:43:30,287197+00:00 Out of memory: Killed process 123, UID 48, (httpd) +kern :warn : 2025-01-21T08:44:30,287197+00:00 Out of memory: Kill process 456 (python) score 50 or sacrifice child +kern :warn : 2025-01-21T08:45:30,287197+00:00 Out of memory: Killed process 123, UID 48, (httpd). +kern :warn : 2025-01-21T08:46:30,287197+00:00 oom-kill:constraint=CONSTRAINT_MEMCG,nodemask=(null),cpuset=cri-containerd-3fc28fa1c647ceede9f6b340d0b16c9f1f663698972d22a52e296f291638e014.scope,mems_allowed=0-1,oom_memcg=/lxc.payload.ny2g2r5hh3-lxc/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-poda8697f49_441d_4d4f_90d2_6d8e1fa3bbe7.slice,task_memcg=/lxc.payload.ny2g2r5hh3-lxc/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-poda8697f49_441d_4d4f_90d2_6d8e1fa3bbe7.slice/cri-containerd-3fc28fa1c647ceede9f6b340d0b16c9f1f663698972d22a52e296f291638e014.scope,task=node,pid=863987,uid=0 +kern :warn : 2025-01-21T08:47:30,287197+00:00 oom-kill:constraint=OTHER_CONSTRAINT \ No newline at end of file diff --git a/components/memory/watcher.go b/components/memory/watcher.go new file mode 100644 index 00000000..e6ecbce8 --- /dev/null +++ b/components/memory/watcher.go @@ -0,0 +1,101 @@ +package memory + +import ( + "context" + "sync" + "time" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" + memory_dmesg "github.com/leptonai/gpud/components/memory/dmesg" + "github.com/leptonai/gpud/log" + pkg_dmesg "github.com/leptonai/gpud/pkg/dmesg" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type watcher struct { + ctx context.Context + eventsStore events_db.Store + + closeOnce sync.Once + dmesgWatcher pkg_dmesg.Watcher +} + +func newWatcher(ctx context.Context, eventsStore events_db.Store) (*watcher, error) { + dw, err := pkg_dmesg.NewWatcher() + if err != nil { + return nil, err + } + + w := &watcher{ + ctx: ctx, + eventsStore: eventsStore, + dmesgWatcher: dw, + } + go w.watch() + + return w, nil +} + +const EventKeyLogLine = "log_line" + +func (w *watcher) watch() { + ch := w.dmesgWatcher.Watch() + for { + select { + case <-w.ctx.Done(): + return + case line, open := <-ch: + if !open { + return + } + if line.IsEmpty() { + continue + } + + ev := components.Event{ + Time: metav1.Time{Time: line.Timestamp.UTC()}, + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{ + EventKeyLogLine: line.Content, + }, + } + + ev.Name, ev.Message = memory_dmesg.Match(line.Content) + if ev.Name == "" { + continue + } + + cctx, ccancel := context.WithTimeout(w.ctx, 15*time.Second) + found, err := w.eventsStore.Find(cctx, components.Event{ + Time: ev.Time, + Name: ev.Name, + Type: ev.Type, + }) + ccancel() + if err != nil { + log.Logger.Errorw("failed to find event", "eventName", ev.Name, "eventType", ev.Type, "error", err) + } + if found != nil { + continue + } + + cctx, ccancel = context.WithTimeout(w.ctx, 15*time.Second) + err = w.eventsStore.Insert(cctx, ev) + ccancel() + if err != nil { + log.Logger.Errorw("failed to insert event", "error", err) + } else { + log.Logger.Infow("successfully inserted event", "event", ev.Name) + } + } + } +} + +func (w *watcher) close() { + w.closeOnce.Do(func() { + w.dmesgWatcher.Close() + }) +} diff --git a/components/memory/watcher_test.go b/components/memory/watcher_test.go new file mode 100644 index 00000000..899cda57 --- /dev/null +++ b/components/memory/watcher_test.go @@ -0,0 +1,63 @@ +package memory + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + events_db "github.com/leptonai/gpud/components/db" + pkg_dmesg "github.com/leptonai/gpud/pkg/dmesg" + "github.com/leptonai/gpud/pkg/sqlite" +) + +func TestWatcher(t *testing.T) { + dmesgWatcher, err := pkg_dmesg.NewWatcherWithCommands([][]string{ + { + "cat ./testdata/dmesg.decode.iso.log", + }, + }) + if err != nil { + t.Fatalf("failed to create dmesg watcher: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + eventsStore, err := events_db.NewStore(dbRW, dbRO, "test", 0) + if err != nil { + t.Fatalf("failed to create events store: %v", err) + } + defer eventsStore.Close() + + w := &watcher{ + ctx: ctx, + closeOnce: sync.Once{}, + eventsStore: eventsStore, + dmesgWatcher: dmesgWatcher, + } + go w.watch() + defer w.close() + + time.Sleep(5 * time.Second) + + events, err := eventsStore.Get(ctx, time.Unix(0, 0)) + if err != nil { + t.Fatalf("failed to get events: %v", err) + } + + if len(events) == 0 { + t.Skip("no events found") // slow CI... + } + + t.Logf("found %d events", len(events)) + for _, ev := range events { + if !strings.Contains(ev.Name, "memory_") { + t.Fatalf("unexpected event type: %s", ev.Name) + } + } +} diff --git a/internal/server/server.go b/internal/server/server.go index bb3401c5..f9111ecf 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -670,7 +670,11 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) } - allComponents = append(allComponents, memory.New(ctx, cfg)) + c, err := memory.New(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create component %s: %w", k, err) + } + allComponents = append(allComponents, c) case os_id.Name: cfg := os.Config{Query: defaultQueryCfg} diff --git a/pkg/dmesg/watcher.go b/pkg/dmesg/watcher.go index 03375c41..d5b6c1f0 100644 --- a/pkg/dmesg/watcher.go +++ b/pkg/dmesg/watcher.go @@ -32,6 +32,10 @@ type LogLine struct { Error string } +func (l LogLine) IsEmpty() bool { + return l.Timestamp.IsZero() && l.Facility == "" && l.Level == "" && l.Content == "" && l.Error == "" +} + type Watcher interface { // Watch returns a channel that emits log lines. // The channel is closed on (1) process exit, (2) on calling "Close" method @@ -42,10 +46,10 @@ type Watcher interface { } func NewWatcher() (Watcher, error) { - return newWatcher(DefaultWatchCommands) + return NewWatcherWithCommands(DefaultWatchCommands) } -func newWatcher(cmds [][]string) (Watcher, error) { +func NewWatcherWithCommands(cmds [][]string) (Watcher, error) { if len(cmds) == 0 { return nil, errors.New("no commands provided") } @@ -145,8 +149,6 @@ func read(ctx context.Context, p process.Process, ch chan<- LogLine) { } } -const isoFormat = "2006-01-02T15:04:05,999999-07:00" - // parses the timestamp from "dmesg --time-format=iso" output lines. // "The definition of the iso timestamp is: YYYY-MM-DDHH:MM:SS,←+>." func parseDmesgLine(line string) LogLine { @@ -195,7 +197,11 @@ func parseDmesgLine(line string) LogLine { return logLine } -var isoTsRegex = regexp.MustCompile(`\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])T([01]\d|2[0-3]):([0-5]\d):([0-5]\d)`) +const isoFormat = "2006-01-02T15:04:05,999999-07:00" + +// shorter "2025-01-17T15:36:11" should not match +// only "2025-01-17T15:36:17,304997+00:00" should match +var isoTsRegex = regexp.MustCompile(`\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])T([01]\d|2[0-3]):([0-5]\d):([0-5]\d),\d{6}[+-]\d{2}:\d{2}`) func findISOTimestampIndex(s string) int { loc := isoTsRegex.FindStringIndex(s) diff --git a/pkg/dmesg/watcher_test.go b/pkg/dmesg/watcher_test.go index 710b546a..b3150820 100644 --- a/pkg/dmesg/watcher_test.go +++ b/pkg/dmesg/watcher_test.go @@ -2,13 +2,15 @@ package dmesg import ( "context" + "os" + "path/filepath" "strings" "testing" "time" ) func TestWatch(t *testing.T) { - w, err := newWatcher([][]string{{"echo 123"}}) + w, err := NewWatcherWithCommands([][]string{{"echo 123"}}) if err != nil { t.Fatalf("failed to create watcher: %v", err) } @@ -24,7 +26,7 @@ func TestWatch(t *testing.T) { func TestWatchDmesgLogs(t *testing.T) { // sleep 5 seconds to stream the whole file before command exit - w, err := newWatcher([][]string{ + w, err := NewWatcherWithCommands([][]string{ {"cat testdata/dmesg.decode.iso.log.0"}, {"cat testdata/dmesg.decode.iso.log.1"}, {"sleep 7"}, @@ -131,7 +133,7 @@ func TestParseDmesgLine(t *testing.T) { } func TestWatcherClose(t *testing.T) { - w, err := newWatcher([][]string{ + w, err := NewWatcherWithCommands([][]string{ {"sleep 10"}, }) if err != nil { @@ -179,7 +181,7 @@ func TestWatcherError(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := newWatcher(tt.cmds) + _, err := NewWatcherWithCommands(tt.cmds) if (err != nil) != tt.wantErr { t.Errorf("newWatcher() error = %v, wantErr %v", err, tt.wantErr) } @@ -212,9 +214,39 @@ func TestFindISOTimestampIndex(t *testing.T) { input string want int }{ + { + name: "invalid month/hour", + input: "2023-13-01T25:00:00 invalid date", + want: -1, + }, { name: "valid timestamp", + input: "kern :info : 2025-01-17T15:36:17,173085+00:00", + want: 15, + }, + { + name: "invalid timestamp with message", + input: " 2024-02-29T15:30:00 some message", + want: -1, + }, + { + name: "no timestamp", + input: "no timestamp here", + want: -1, + }, + { + name: "shorter timestamp", + input: "kern :info : 2025-01-17T15:36:11", + want: -1, + }, + { + name: "valid timestamp but shorter", input: "prefix 2024-01-21T04:41:44 suffix", + want: -1, + }, + { + name: "valid timestamp", + input: "prefix 2025-01-17T15:36:17,173085+00:00 suffix", want: 7, }, { @@ -233,14 +265,14 @@ func TestFindISOTimestampIndex(t *testing.T) { want: -1, }, { - name: "timestamp at start", + name: "timestamp at start but shorter", input: "2024-01-21T04:41:44 message", - want: 0, + want: -1, }, { - name: "multiple timestamps", + name: "multiple timestamps but shorter", input: "2024-01-21T04:41:44 and 2024-01-21T04:41:45", - want: 0, + want: -1, }, } @@ -254,10 +286,47 @@ func TestFindISOTimestampIndex(t *testing.T) { } } +func TestFindTimestampIndexFromFiles(t *testing.T) { + t.Parallel() + + dir, err := os.ReadDir("testdata") + if err != nil { + t.Fatalf("failed to read testdata dir: %v", err) + } + + for _, entry := range dir { + if entry.IsDir() { + continue + } + + b, err := os.ReadFile(filepath.Join("testdata", entry.Name())) + if err != nil { + t.Fatalf("failed to read file: %v", err) + } + lines := strings.Split(string(b), "\n") + for _, line := range lines { + if len(line) == 0 { + continue + } + + idx := findISOTimestampIndex(line) + if idx == -1 { + t.Logf("file %s: %d %q", entry.Name(), idx, line) + } + + // should never happen + if idx != -1 && len(line) < len(isoFormat) { + t.Errorf("file %s: %d %q", entry.Name(), len(line), line) + } + } + } + +} + func TestWatchMultipleCommands(t *testing.T) { // wait for some time to be read // slow CI - w, err := newWatcher( + w, err := NewWatcherWithCommands( [][]string{ {"echo 'first command'"}, {"echo 'second command'"}, @@ -286,7 +355,7 @@ func TestWatchMultipleCommands(t *testing.T) { } func TestWatchWithError(t *testing.T) { - w, err := newWatcher([][]string{ + w, err := NewWatcherWithCommands([][]string{ {"cat nonexistentfile"}, }) if err != nil { @@ -307,48 +376,3 @@ func TestWatchWithError(t *testing.T) { t.Error("expected to see an error line") } } - -func TestFindTimestampIndex(t *testing.T) { - t.Parallel() - - testCases := []struct { - input string - expected int - }{ - { - input: "2025-01-21T04:41:44 some message", - expected: 0, - }, - { - input: "prefix 2023-12-31T23:59:59 some message", - expected: 7, - }, - { - input: " 2024-02-29T15:30:00 some message", - expected: 2, - }, - { - input: "no timestamp here", - expected: -1, - }, - { - input: "", - expected: -1, - }, - { - input: "2023-13-01T25:00:00 invalid date", // invalid month/hour - expected: -1, - }, - { - input: "partial 2023-01-01 timestamp", // missing time part - expected: -1, - }, - } - - for _, tc := range testCases { - idx := findISOTimestampIndex(tc.input) - if idx != tc.expected { - t.Errorf("Expected index %d for input %q, got %d", tc.expected, tc.input, idx) - } - } -}