diff --git a/components/db/events.go b/components/db/events.go new file mode 100644 index 00000000..5670fb70 --- /dev/null +++ b/components/db/events.go @@ -0,0 +1,447 @@ +package db + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/pkg/sqlite" + + _ "github.com/mattn/go-sqlite3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const schemaVersion = "v0_4_0" + +// Creates the default table name for the component. +// The table name is in the format of "components_{component_name}_events_v0_4_0". +// Suffix with the version, in case we change the table schema. +func CreateDefaultTableName(componentName string) string { + c := strings.ReplaceAll(componentName, " ", "_") + c = strings.ReplaceAll(c, "-", "_") + c = strings.ReplaceAll(c, "__", "_") + c = strings.ToLower(c) + tableName := fmt.Sprintf("components_%s_events_%s", c, schemaVersion) + return tableName +} + +const ( + // Event timestamp in unix seconds. + ColumnTimestamp = "timestamp" + + // event name + // e.g., "memory_oom", "memory_oom_kill_constraint", "memory_oom_cgroup", "memory_edac_correctable_errors". + ColumnName = "name" + + // event type + // e.g., "Unknown", "Info", "Warning", "Critical", "Fatal". + ColumnType = "type" + + // event message + // e.g., "VFS file-max limit reached" + ColumnMessage = "message" + + // event extra info + // e.g., + // data source: "dmesg", "nvml", "nvidia-smi". + // event target: "gpu_id", "gpu_uuid". + // log detail: "oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0". + ColumnExtraInfo = "extra_info" + + // event suggested actions + // e.g., "reboot" + ColumnSuggestedActions = "suggested_actions" +) + +type storeImpl struct { + rootCtx context.Context + rootCancel context.CancelFunc + + table string + dbRW *sql.DB + dbRO *sql.DB + retention time.Duration +} + +var ( + ErrNoDBRWSet = errors.New("no writable db set") + ErrNoDBROSet = errors.New("no read-only db set") +) + +type Store interface { + Insert(ctx context.Context, ev components.Event) error + Find(ctx context.Context, ev components.Event) (*components.Event, error) + Get(ctx context.Context, since time.Time) ([]components.Event, error) + Purge(ctx context.Context, beforeTimestamp int64) (int, error) + Close() +} + +var _ Store = (*storeImpl)(nil) + +// Creates a new DB instance with the table created. +// Requires write-only and read-only instances for minimize conflicting writes/reads. +// ref. https://github.com/mattn/go-sqlite3/issues/1179#issuecomment-1638083995 +func NewStore(dbRW *sql.DB, dbRO *sql.DB, tableName string, retention time.Duration) (Store, error) { + if dbRW == nil { + return nil, ErrNoDBRWSet + } + if dbRO == nil { + return nil, ErrNoDBROSet + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := createTable(ctx, dbRW, tableName) + cancel() + if err != nil { + return nil, err + } + + rootCtx, rootCancel := context.WithCancel(context.Background()) + s := &storeImpl{ + rootCtx: rootCtx, + rootCancel: rootCancel, + table: tableName, + dbRW: dbRW, + dbRO: dbRO, + retention: retention, + } + go s.runPurge() + + return s, nil +} + +func (s *storeImpl) runPurge() { + if s.retention < time.Second { + return + } + + // actual check interval should be lower than the retention period + // in case of GPUd restarts + checkInterval := s.retention / 5 + if checkInterval < time.Second { + checkInterval = time.Second + } + + log.Logger.Infow("start purging", "table", s.table, "retention", s.retention) + for { + select { + case <-s.rootCtx.Done(): + return + case <-time.After(checkInterval): + } + + now := time.Now().UTC() + purged, err := s.Purge(s.rootCtx, now.Add(-s.retention).Unix()) + if err != nil { + log.Logger.Errorw("failed to purge data", "table", s.table, "retention", s.retention, "error", err) + } else { + log.Logger.Infow("purged data", "table", s.table, "retention", s.retention, "purged", purged) + } + } +} + +func (s *storeImpl) Close() { + log.Logger.Infow("closing the store", "table", s.table) + if s.rootCancel != nil { + s.rootCancel() + } +} + +func (s *storeImpl) Insert(ctx context.Context, ev components.Event) error { + return insertEvent(ctx, s.dbRW, s.table, ev) +} + +func (s *storeImpl) Find(ctx context.Context, ev components.Event) (*components.Event, error) { + return findEvent(ctx, s.dbRO, s.table, ev) +} + +// Returns the event in the descending order of timestamp (latest event first). +func (s *storeImpl) Get(ctx context.Context, since time.Time) ([]components.Event, error) { + return getEvents(ctx, s.dbRO, s.table, since) +} + +func (s *storeImpl) Purge(ctx context.Context, beforeTimestamp int64) (int, error) { + return purgeEvents(ctx, s.dbRW, s.table, beforeTimestamp) +} + +func createTable(ctx context.Context, db *sql.DB, tableName string) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + // create table + _, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( + %s INTEGER NOT NULL, + %s TEXT NOT NULL, + %s TEXT NOT NULL, + %s TEXT, + %s TEXT, + %s TEXT +);`, tableName, + ColumnTimestamp, + ColumnName, + ColumnType, + ColumnMessage, + ColumnExtraInfo, + ColumnSuggestedActions, + )) + if err != nil { + _ = tx.Rollback() + return err + } + + _, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_%s ON %s(%s);`, + tableName, ColumnTimestamp, tableName, ColumnTimestamp)) + if err != nil { + _ = tx.Rollback() + return err + } + + _, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_%s ON %s(%s);`, + tableName, ColumnName, tableName, ColumnName)) + if err != nil { + _ = tx.Rollback() + return err + } + + _, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_%s ON %s(%s);`, + tableName, ColumnType, tableName, ColumnType)) + if err != nil { + _ = tx.Rollback() + return err + } + + return tx.Commit() +} + +func insertEvent(ctx context.Context, db *sql.DB, tableName string, ev components.Event) error { + start := time.Now() + extraInfoJSON, err := json.Marshal(ev.ExtraInfo) + if err != nil { + return fmt.Errorf("failed to marshal extra info: %w", err) + } + suggestedActionsJSON, err := json.Marshal(ev.SuggestedActions) + if err != nil { + return fmt.Errorf("failed to marshal suggested actions: %w", err) + } + + _, err = db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (%s, %s, %s, %s, %s, %s) VALUES (?, ?, ?, NULLIF(?, ''), NULLIF(?, ''), NULLIF(?, ''))", + tableName, + ColumnTimestamp, + ColumnName, + ColumnType, + ColumnMessage, + ColumnExtraInfo, + ColumnSuggestedActions, + ), + ev.Time.Unix(), + ev.Name, + ev.Type, + ev.Message, + string(extraInfoJSON), + string(suggestedActionsJSON), + ) + sqlite.RecordInsertUpdate(time.Since(start).Seconds()) + + return err +} + +func findEvent(ctx context.Context, db *sql.DB, tableName string, ev components.Event) (*components.Event, error) { + selectStatement := fmt.Sprintf(` +SELECT %s, %s, %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ?`, + ColumnTimestamp, + ColumnName, + ColumnType, + ColumnMessage, + ColumnExtraInfo, + ColumnSuggestedActions, + tableName, + ColumnTimestamp, + ColumnName, + ColumnType, + ) + if ev.Message != "" { + selectStatement += fmt.Sprintf(" AND %s = ?", ColumnMessage) + } + if len(ev.ExtraInfo) > 0 { + selectStatement += fmt.Sprintf(" AND %s = ?", ColumnExtraInfo) + } + if ev.SuggestedActions != nil { + selectStatement += fmt.Sprintf(" AND %s = ?", ColumnSuggestedActions) + } + + params := []any{ev.Time.Unix(), ev.Name, ev.Type} + if ev.Message != "" { + params = append(params, ev.Message) + } + if len(ev.ExtraInfo) > 0 { + extraInfoJSON, err := json.Marshal(ev.ExtraInfo) + if err != nil { + return nil, fmt.Errorf("failed to marshal extra info: %w", err) + } + params = append(params, string(extraInfoJSON)) + } + if ev.SuggestedActions != nil { + suggestedActionsJSON, err := json.Marshal(ev.SuggestedActions) + if err != nil { + return nil, fmt.Errorf("failed to marshal suggested actions: %w", err) + } + params = append(params, string(suggestedActionsJSON)) + } + + row := db.QueryRowContext(ctx, selectStatement, params...) + foundEvent, err := scanRow(row) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + + return &foundEvent, nil +} + +// Returns the event in the descending order of timestamp (latest event first). +func getEvents(ctx context.Context, db *sql.DB, tableName string, since time.Time) ([]components.Event, error) { + query := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s, %s +FROM %s +WHERE %s > ? +ORDER BY %s DESC`, + ColumnTimestamp, ColumnName, ColumnType, ColumnMessage, ColumnExtraInfo, ColumnSuggestedActions, + tableName, + ColumnTimestamp, + ColumnTimestamp, + ) + params := []any{since.UTC().Unix()} + + start := time.Now() + rows, err := db.QueryContext(ctx, query, params...) + sqlite.RecordSelect(time.Since(start).Seconds()) + + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + defer rows.Close() + + events := []components.Event{} + for rows.Next() { + event, err := scanRows(rows) + if err != nil { + return nil, err + } + events = append(events, event) + } + if len(events) == 0 { + return nil, nil + } + return events, nil +} + +func scanRow(row *sql.Row) (components.Event, error) { + var event components.Event + var timestamp int64 + var msg sql.NullString + var extraInfo sql.NullString + var suggestedActions sql.NullString + err := row.Scan( + ×tamp, + &event.Name, + &event.Type, + &msg, + &extraInfo, + &suggestedActions, + ) + if err != nil { + return event, err + } + + event.Time = metav1.Time{Time: time.Unix(timestamp, 0)} + if msg.Valid { + event.Message = msg.String + } + if extraInfo.Valid && len(extraInfo.String) > 0 && extraInfo.String != "null" { + var extraInfoMap map[string]string + if err := json.Unmarshal([]byte(extraInfo.String), &extraInfoMap); err != nil { + return event, fmt.Errorf("failed to unmarshal extra info: %w", err) + } + event.ExtraInfo = extraInfoMap + } + if suggestedActions.Valid && len(suggestedActions.String) > 0 && suggestedActions.String != "null" { + var suggestedActionsObj common.SuggestedActions + if err := json.Unmarshal([]byte(suggestedActions.String), &suggestedActionsObj); err != nil { + return event, fmt.Errorf("failed to unmarshal suggested actions: %w", err) + } + event.SuggestedActions = &suggestedActionsObj + } + return event, nil +} + +func scanRows(rows *sql.Rows) (components.Event, error) { + var event components.Event + var timestamp int64 + var msg sql.NullString + var extraInfo sql.NullString + var suggestedActions sql.NullString + err := rows.Scan( + ×tamp, + &event.Name, + &event.Type, + &msg, + &extraInfo, + &suggestedActions, + ) + if err != nil { + return event, err + } + + event.Time = metav1.Time{Time: time.Unix(timestamp, 0)} + if msg.Valid { + event.Message = msg.String + } + if extraInfo.Valid { + var extraInfoMap map[string]string + if err := json.Unmarshal([]byte(extraInfo.String), &extraInfoMap); err != nil { + return event, fmt.Errorf("failed to unmarshal extra info: %w", err) + } + event.ExtraInfo = extraInfoMap + } + if suggestedActions.Valid && suggestedActions.String != "" { + var suggestedActionsObj common.SuggestedActions + if err := json.Unmarshal([]byte(suggestedActions.String), &suggestedActionsObj); err != nil { + return event, fmt.Errorf("failed to unmarshal suggested actions: %w", err) + } + event.SuggestedActions = &suggestedActionsObj + } + return event, nil +} + +func purgeEvents(ctx context.Context, db *sql.DB, tableName string, beforeTimestamp int64) (int, error) { + deleteStatement := fmt.Sprintf(`DELETE FROM %s WHERE %s < ?`, + tableName, + ColumnTimestamp, + ) + + start := time.Now() + rs, err := db.ExecContext(ctx, deleteStatement, beforeTimestamp) + if err != nil { + return 0, err + } + sqlite.RecordDelete(time.Since(start).Seconds()) + + affected, err := rs.RowsAffected() + if err != nil { + return 0, err + } + return int(affected), nil +} diff --git a/components/db/events_benchmark_test.go b/components/db/events_benchmark_test.go new file mode 100644 index 00000000..a4c2a346 --- /dev/null +++ b/components/db/events_benchmark_test.go @@ -0,0 +1,77 @@ +package db + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/pkg/sqlite" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" +) + +// BENCHMARK=true go test -v -run=TestSimulatedEvents -timeout=10m +func TestSimulatedEvents(t *testing.T) { + if os.Getenv("BENCHMARK") != "true" { + t.Skip("skipping benchmark test") + } + + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tableName := CreateDefaultTableName("hello") + store, err := NewStore(dbRW, dbRO, tableName, 0) + assert.NoError(t, err) + defer store.Close() + daysToIngest := 3 + eventsN := daysToIngest * 24 * 60 * 60 + + now := time.Now() + for i := 0; i < eventsN; i++ { + ev := components.Event{ + Time: metav1.Time{Time: now.Add(time.Duration(i) * time.Minute)}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Test message with normal text", + ExtraInfo: map[string]string{ + "a": fmt.Sprintf("%d", i), + }, + SuggestedActions: &common.SuggestedActions{ + RepairActions: []common.RepairActionType{ + common.RepairActionTypeIgnoreNoActionRequired, + }, + }, + } + if err := store.Insert(ctx, ev); err != nil { + t.Fatalf("failed to insert event: %v", err) + } + } + t.Logf("ingested %d events", eventsN) + + size, err := sqlite.ReadDBSize(ctx, dbRO) + if err != nil { + t.Fatalf("failed to read db size: %v", err) + } + t.Logf("db size: %s", humanize.Bytes(size)) // 361 M + + if err := sqlite.Compact(ctx, dbRW); err != nil { + t.Fatalf("failed to compact db: %v", err) + } + + size, err = sqlite.ReadDBSize(ctx, dbRO) + if err != nil { + t.Fatalf("failed to read db size: %v", err) + } + t.Logf("db size: %s", humanize.Bytes(size)) // 341 MB +} diff --git a/components/db/events_test.go b/components/db/events_test.go new file mode 100644 index 00000000..c8ce26f6 --- /dev/null +++ b/components/db/events_test.go @@ -0,0 +1,1381 @@ +package db + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/pkg/sqlite" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestCreateDefaultTableName(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + input string + expected string + }{ + { + name: "simple name", + input: "test", + expected: fmt.Sprintf("components_test_events_%s", schemaVersion), + }, + { + name: "name with spaces", + input: "test component", + expected: fmt.Sprintf("components_test_component_events_%s", schemaVersion), + }, + { + name: "name with hyphens", + input: "test-component", + expected: fmt.Sprintf("components_test_component_events_%s", schemaVersion), + }, + { + name: "mixed case", + input: "TestComponent", + expected: fmt.Sprintf("components_testcomponent_events_%s", schemaVersion), + }, + { + name: "complex name", + input: "Test Component-Name", + expected: fmt.Sprintf("components_test_component_name_events_%s", schemaVersion), + }, + { + name: "empty string", + input: "", + expected: fmt.Sprintf("components__events_%s", schemaVersion), + }, + { + name: "multiple spaces and hyphens", + input: "test component--name", + expected: fmt.Sprintf("components_test_component_name_events_%s", schemaVersion), + }, + } + + for _, tc := range testCases { + tc := tc // capture range variable + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + result := CreateDefaultTableName(tc.input) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestTableInsertsReads(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + first := time.Now().UTC() + + events := []components.Event{} + eventsN := 10 + for i := 0; i < eventsN; i++ { + events = append(events, components.Event{ + Time: metav1.Time{Time: first.Add(time.Duration(i) * time.Second)}, + Name: "dmesg", + Type: common.EventTypeWarning, + Message: fmt.Sprintf("OOM event %d occurred", i), + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{fmt.Sprintf("oom_reaper: reaped process %d (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0", i)}, + }, + }) + } + + for _, ev := range events { + assert.NoError(t, store.Insert(ctx, ev)) + } + + events, err = store.Get(ctx, first.Add(-30*time.Second)) + assert.NoError(t, err) + assert.Equal(t, eventsN, len(events)) + + // make sure timestamp is in descending order + for i := 1; i < len(events); i++ { + assert.Greater(t, events[i-1].Time.Unix(), events[i].Time.Unix(), "timestamps should be in descending order") + // Since events are returned in descending order (newest first), + // the message index should be eventsN - (i + 1) for the current event + expectedMsg := fmt.Sprintf("OOM event %d occurred", eventsN-(i+1)) + assert.Equal(t, expectedMsg, events[i].Message, "messages should match in descending order") + } + + deleted, err := store.Purge(ctx, first.Add(time.Duration(eventsN*2)*time.Second).Unix()) + assert.NoError(t, err) + assert.Equal(t, eventsN, deleted) +} + +func TestGetEventsTimeRange(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + db, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "dmesg", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"old event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime.Add(-5 * time.Minute)}, + Name: "dmesg", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"mid event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"recent event"}, + }, + }, + } + + for _, ev := range events { + assert.NoError(t, db.Insert(ctx, ev)) + } + + // Test getting all events + allEvents, err := db.Get(ctx, baseTime.Add(-15*time.Minute)) + assert.NoError(t, err) + assert.Equal(t, 3, len(allEvents)) + + // Test getting recent events only + recentEvents, err := db.Get(ctx, baseTime.Add(-2*time.Minute)) + assert.NoError(t, err) + assert.Equal(t, 1, len(recentEvents)) + assert.Equal(t, "recent event", recentEvents[0].SuggestedActions.Descriptions[0]) +} + +func TestEmptyResults(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + // Test getting events from empty table + events, err := store.Get(ctx, time.Now().Add(-1*time.Hour)) + assert.NoError(t, err) + assert.Nil(t, events) + + // Test purging empty table + deleted, err := store.Purge(ctx, time.Now().Unix()) + assert.NoError(t, err) + assert.Equal(t, 0, deleted) +} + +func TestMultipleEventTypes(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"oom event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime.Add(1 * time.Second)}, + Name: "syslog", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"edac event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime.Add(2 * time.Second)}, + Name: "dmesg", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"cgroup event"}, + }, + }, + } + + for _, ev := range events { + assert.NoError(t, store.Insert(ctx, ev)) + } + + // Get all events + results, err := store.Get(ctx, baseTime.Add(-1*time.Second)) + assert.NoError(t, err) + assert.Equal(t, 3, len(results)) + + // Verify events are in descending order + assert.Equal(t, common.EventTypeWarning, results[0].Type) + assert.Equal(t, common.EventTypeWarning, results[1].Type) + assert.Equal(t, common.EventTypeWarning, results[2].Type) +} + +func TestPurgePartial(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "old_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"old event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "new_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"recent event"}, + }, + }, + } + + for _, ev := range events { + assert.NoError(t, store.Insert(ctx, ev)) + } + + // Purge only old events + deleted, err := store.Purge(ctx, baseTime.Add(-5*time.Minute).Unix()) + assert.NoError(t, err) + assert.Equal(t, 1, deleted) + + // Verify only recent event remains + remaining, err := store.Get(ctx, baseTime.Add(-15*time.Minute)) + assert.NoError(t, err) + assert.Equal(t, 1, len(remaining)) + extraInfoJSON, err := json.Marshal(remaining[0].ExtraInfo) + assert.NoError(t, err) + assert.Equal(t, `{"id":"new_event"}`, string(extraInfoJSON)) + + // Try to find old event by ExtraInfo + oldEvent := components.Event{ + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "old_event"}, + } + found, err := store.Find(ctx, oldEvent) + assert.NoError(t, err) + assert.Nil(t, found, "Old event should not be found after purge") +} + +func TestFindEvent(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + testEvent := components.Event{ + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"a": "b"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"old event"}, + }, + } + + // Test finding non-existent event + found, err := store.Find(ctx, testEvent) + assert.NoError(t, err) + assert.Nil(t, found) + + // Insert and find the event + assert.NoError(t, store.Insert(ctx, testEvent)) + + found, err = store.Find(ctx, testEvent) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, testEvent.Time.Unix(), found.Time.Unix()) + assert.Equal(t, testEvent.Name, found.Name) + assert.Equal(t, testEvent.Type, found.Type) + assert.Equal(t, testEvent.ExtraInfo, found.ExtraInfo) + assert.Equal(t, testEvent.SuggestedActions.Descriptions[0], found.SuggestedActions.Descriptions[0]) +} + +func TestFindEventPartialMatch(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + testEvent := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"a": "b"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"original details"}, + }, + } + + assert.NoError(t, store.Insert(ctx, testEvent)) + + // Test finding with matching timestamp/source/type but different details + searchEvent := components.Event{ + Time: metav1.Time{Time: testEvent.Time.Time}, + Name: testEvent.Name, + Type: testEvent.Type, + ExtraInfo: testEvent.ExtraInfo, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"different details"}, + }, + } + + found, err := store.Find(ctx, searchEvent) + assert.NoError(t, err) + assert.Nil(t, found) +} + +func TestFindEventMultipleMatches(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"a": "b"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"first event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"a": "b"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"second event"}, + }, + }, + } + + // Insert multiple events with same timestamp/source/type + for _, ev := range events { + assert.NoError(t, store.Insert(ctx, ev)) + } + + // Search should return the first matching event + searchEvent := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"a": "b"}, + } + + found, err := store.Find(ctx, searchEvent) + assert.NoError(t, err) + assert.NotNil(t, found) + + // Should match one of the events + foundMatch := false + for _, ev := range events { + if found.SuggestedActions.Descriptions[0] == ev.SuggestedActions.Descriptions[0] { + foundMatch = true + break + } + } + assert.True(t, foundMatch, "Found event should match one of the inserted events") +} + +func TestEventWithIDs(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + event := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "nvidia-smi", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{ + "xid": "123", + "gpu_uuid": "gpu-123", + }, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"GPU error details"}, + }, + } + + // Test insert and find with ExtraInfo + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.ExtraInfo, found.ExtraInfo) + + // Test find with partial ExtraInfo match + partialEvent := components.Event{ + Time: metav1.Time{Time: event.Time.Time}, + Name: event.Name, + Type: event.Type, + } + + found, err = store.Find(ctx, partialEvent) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.ExtraInfo, found.ExtraInfo) + + // Test find with different ExtraInfo + differentEvent := components.Event{ + Time: metav1.Time{Time: event.Time.Time}, + Name: event.Name, + Type: event.Type, + ExtraInfo: map[string]string{ + "xid": "different", + "gpu_uuid": "different-gpu", + }, + } + + found, err = store.Find(ctx, differentEvent) + assert.NoError(t, err) + assert.Nil(t, found, "Should not find event with different ExtraInfo") +} + +func TestNullEventIDs(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + event := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "dmesg", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"Event with null ExtraInfo"}, + }, + } + + // Test insert and find with null ExtraInfo + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, len(found.ExtraInfo), 0) +} + +func TestPurgeWithEventIDs(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + db, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "old_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"old event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "new_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"new event"}, + }, + }, + } + + for _, event := range events { + err = db.Insert(ctx, event) + assert.NoError(t, err) + } + + // Purge old events + deleted, err := db.Purge(ctx, baseTime.Add(-5*time.Minute).Unix()) + assert.NoError(t, err) + assert.Equal(t, 1, deleted) + + // Verify only new event remains with correct ExtraInfo + remaining, err := db.Get(ctx, baseTime.Add(-15*time.Minute)) + assert.NoError(t, err) + assert.Equal(t, 1, len(remaining)) + extraInfoJSON, err := json.Marshal(remaining[0].ExtraInfo) + assert.NoError(t, err) + assert.Equal(t, `{"id":"new_event"}`, string(extraInfoJSON)) + + // Try to find old event by ExtraInfo + oldEvent := components.Event{ + Time: metav1.Time{Time: baseTime.Add(-10 * time.Minute)}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "old_event"}, + } + found, err := db.Find(ctx, oldEvent) + assert.NoError(t, err) + assert.Nil(t, found, "Old event should not be found after purge") +} + +func TestInvalidTableName(t *testing.T) { + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + // Test with invalid table name + _, err := NewStore(dbRW, dbRO, "invalid;table;name", 0) + assert.Error(t, err) +} + +func TestContextCancellation(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + // Test with canceled context + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC()}, + Name: "test", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"Test details"}, + }, + } + + err = store.Insert(canceledCtx, event) + assert.Error(t, err) + + _, err = store.Find(canceledCtx, event) + assert.Error(t, err) + + _, err = store.Get(canceledCtx, time.Now().Add(-1*time.Hour)) + assert.Error(t, err) +} + +func TestConcurrentAccess(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + db, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + + baseTime := time.Now().UTC() + eventCount := 100 + done := make(chan bool) + + // Concurrent inserts + go func() { + for i := 0; i < eventCount; i++ { + event := components.Event{ + Time: metav1.Time{Time: baseTime.Add(time.Duration(i) * time.Second)}, + Name: "concurrent", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{fmt.Sprintf("Concurrent event %d", i)}, + }, + } + assert.NoError(t, db.Insert(ctx, event)) + } + done <- true + }() + + // Concurrent reads + go func() { + for i := 0; i < eventCount; i++ { + _, err := db.Get(ctx, baseTime.Add(-1*time.Hour)) + assert.NoError(t, err) + } + done <- true + }() + + // Wait for both goroutines to complete + <-done + <-done + + // Verify final count + events, err := db.Get(ctx, baseTime.Add(-1*time.Hour)) + assert.NoError(t, err) + assert.Equal(t, eventCount, len(events)) +} + +func TestSpecialCharactersInEvents(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + events := []components.Event{ + { + Time: metav1.Time{Time: time.Now().UTC()}, + Name: "test;source", + Type: common.EventTypeWarning, + Message: "message with special chars: !@#$%^&*()", + ExtraInfo: map[string]string{"special chars": "!@#$%^&*()"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"details with special chars"}, + }, + }, + { + Time: metav1.Time{Time: time.Now().UTC()}, + Name: "unicode_source_🔥", + Type: common.EventTypeWarning, + Message: "unicode message: 你好", + ExtraInfo: map[string]string{"unicode info": "你好"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"unicode details: 世界!"}, + }, + }, + } + + // Test insert and retrieval of events with special characters + for _, event := range events { + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.Name, found.Name) + assert.Equal(t, event.Type, found.Type) + assert.Equal(t, event.Message, found.Message) + assert.Equal(t, event.ExtraInfo, found.ExtraInfo) + assert.Equal(t, event.SuggestedActions.Descriptions[0], found.SuggestedActions.Descriptions[0]) + } +} + +func TestLargeEventDetails(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + db, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + + // Create a large event detail string (100KB) + largeDetail := make([]byte, 100*1024) + for i := range largeDetail { + largeDetail[i] = byte('a' + (i % 26)) + } + + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC()}, + Name: "test", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{string(largeDetail)}, + }, + } + + err = db.Insert(ctx, event) + assert.NoError(t, err) + + found, err := db.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.SuggestedActions.Descriptions[0], found.SuggestedActions.Descriptions[0]) +} + +func TestTimestampBoundaries(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + timestamps := []int64{ + 0, // Unix epoch + -1, // Before Unix epoch + 1 << 32, // Large timestamp + -(1 << 31), // Large negative timestamp + time.Now().Unix(), // Current time + 1 << 62, // Very large timestamp + -((1 << 62) + 100), // Very large negative timestamp + } + + for _, ts := range timestamps { + event := components.Event{ + Time: metav1.Time{Time: time.Unix(ts, 0)}, + Name: "test", + Type: common.EventTypeWarning, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{fmt.Sprintf("timestamp: %d", ts)}, + }, + } + + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, ts, found.Time.Unix()) + } + + // Test retrieval with various time ranges + events, err := store.Get(ctx, time.Unix(-(1<<63), 0)) // Get all events + assert.NoError(t, err) + assert.Equal(t, len(timestamps), len(events)) + + events, err = store.Get(ctx, time.Unix(1<<63-1, 0)) // Future time + assert.NoError(t, err) + assert.Nil(t, events) +} + +func TestConcurrentWritesWithDifferentIDs(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + eventCount := 100 + done := make(chan bool) + + // Concurrent inserts + go func() { + for i := 0; i < eventCount; i++ { + event := components.Event{ + Time: metav1.Time{Time: baseTime.Add(time.Duration(i) * time.Second)}, + Name: "concurrent", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{fmt.Sprintf("info_%d", i): fmt.Sprintf("Concurrent event %d", i)}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{fmt.Sprintf("Concurrent event %d", i)}, + }, + } + assert.NoError(t, store.Insert(ctx, event)) + } + done <- true + }() + + // Concurrent reads + go func() { + for i := 0; i < eventCount; i++ { + event := components.Event{ + Time: metav1.Time{Time: baseTime.Add(time.Duration(i) * time.Second)}, + Name: "concurrent", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{fmt.Sprintf("info_%d", i): fmt.Sprintf("Concurrent event %d", i)}, + } + found, err := store.Find(ctx, event) + if err == nil && found != nil { + assert.Equal(t, event.ExtraInfo, found.ExtraInfo) + } + } + done <- true + }() + + // Wait for both goroutines to complete + <-done + <-done + + // Verify all events were inserted + events, err := store.Get(ctx, baseTime.Add(-1*time.Hour)) + assert.NoError(t, err) + assert.Equal(t, eventCount, len(events)) + + // Verify each event has unique info + infoMap := make(map[string]bool) + for _, event := range events { + // Convert the entire ExtraInfo map to a string for comparison + infoStr := fmt.Sprintf("%v", event.ExtraInfo) + assert.False(t, infoMap[infoStr], "Duplicate extra info found") + infoMap[infoStr] = true + } +} + +func TestNewStoreErrors(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + // Test case: nil write DB + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + store, err := NewStore(nil, dbRO, testTableName, 0) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrNoDBRWSet) + assert.Nil(t, store) + + // Test case: nil read DB + store, err = NewStore(dbRW, nil, testTableName, 0) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrNoDBROSet) + assert.Nil(t, store) + + // Test case: both DBs nil + store, err = NewStore(nil, nil, testTableName, 0) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrNoDBRWSet) + assert.Nil(t, store) +} + +func TestEventMessage(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Test message with normal text", + }, + { + Time: metav1.Time{Time: baseTime.Add(1 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + Message: "", // Empty message + }, + { + Time: metav1.Time{Time: baseTime.Add(2 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Message with special chars: !@#$%^&*()", + }, + { + Time: metav1.Time{Time: baseTime.Add(3 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Unicode message: 你好世界", + }, + } + + // Test insert and retrieval + for _, event := range events { + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.Message, found.Message) + } + + // Test finding with message as part of search criteria + searchEvent := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Test message with normal text", + } + found, err := store.Find(ctx, searchEvent) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, searchEvent.Message, found.Message) + + // Test finding with empty message + emptyMessageEvent := components.Event{ + Time: metav1.Time{Time: baseTime.Add(1 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + Message: "", + } + found, err = store.Find(ctx, emptyMessageEvent) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, "", found.Message) + + // Test finding with non-matching message + nonMatchingEvent := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Non-matching message", + } + found, err = store.Find(ctx, nonMatchingEvent) + assert.NoError(t, err) + assert.Nil(t, found) + + // Test getting all events and verify messages + allEvents, err := store.Get(ctx, baseTime.Add(-1*time.Second)) + assert.NoError(t, err) + assert.Equal(t, len(events), len(allEvents)) + + // Verify messages are preserved in descending timestamp order + for i, event := range allEvents { + expectedMsg := events[len(events)-1-i].Message + assert.Equal(t, expectedMsg, event.Message) + } +} + +func TestNilSuggestedActions(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + event := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + Message: "Test message", + ExtraInfo: map[string]string{"key": "value"}, + SuggestedActions: nil, // Explicitly set to nil + } + + // Test insert and find with nil SuggestedActions + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Nil(t, found.SuggestedActions) +} + +func TestInvalidJSONHandling(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + // Insert a valid event first + baseTime := time.Now().UTC() + event := components.Event{ + Time: metav1.Time{Time: baseTime}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"key": "value"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"test action"}, + }, + } + err = store.Insert(ctx, event) + assert.NoError(t, err) + + // Manually insert invalid JSON into the database + _, err = dbRW.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s (timestamp, name, type, extra_info, suggested_actions) + VALUES (?, ?, ?, ?, ?)`, + testTableName), + baseTime.Add(time.Second).Unix(), + "test", + common.EventTypeWarning, + "{invalid_json", // Invalid JSON for ExtraInfo + "{invalid_json", // Invalid JSON for SuggestedActions + ) + assert.NoError(t, err) + + // Try to retrieve the events - should get error for invalid JSON + _, err = store.Get(ctx, baseTime.Add(-time.Hour)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to unmarshal") +} + +func TestEmptyTableName(t *testing.T) { + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + // Test with empty table name + store, err := NewStore(dbRW, dbRO, "", 0) + assert.Error(t, err) + assert.Nil(t, store) +} + +func TestLongEventFields(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + // Create very long strings for various fields + longString := strings.Repeat("a", 10000) + longMap := make(map[string]string) + for i := 0; i < 100; i++ { + longMap[fmt.Sprintf("key_%d", i)] = longString + } + + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC()}, + Name: longString, + Type: common.EventTypeWarning, + Message: longString, + ExtraInfo: longMap, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{longString}, + }, + } + + // Test insert and retrieval of event with very long fields + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, event.Name, found.Name) + assert.Equal(t, event.Message, found.Message) + assert.Equal(t, event.ExtraInfo, found.ExtraInfo) + assert.Equal(t, event.SuggestedActions.Descriptions[0], found.SuggestedActions.Descriptions[0]) +} + +func TestConcurrentTableCreation(t *testing.T) { + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + // Try to create multiple stores with the same table name concurrently + tableName := "concurrent_table" + concurrency := 10 + var wg sync.WaitGroup + stores := make([]Store, concurrency) + errors := make([]error, concurrency) + + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + store, err := NewStore(dbRW, dbRO, tableName, 0) + assert.NoError(t, err) + defer store.Close() + + stores[index] = store + errors[index] = err + }(i) + } + + wg.Wait() + + // Verify that all attempts either succeeded or failed gracefully + successCount := 0 + for i := 0; i < concurrency; i++ { + if errors[i] == nil { + successCount++ + assert.NotNil(t, stores[i]) + } + } + assert.Greater(t, successCount, 0) +} + +func TestEventTypeValidation(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := NewStore(dbRW, dbRO, testTableName, 0) + assert.NoError(t, err) + defer store.Close() + + // Test all valid event types + validTypes := []common.EventType{ + common.EventTypeWarning, + common.EventTypeInfo, + common.EventTypeCritical, + common.EventTypeFatal, + common.EventTypeUnknown, + } + + baseTime := time.Now().UTC() + for i, eventType := range validTypes { + event := components.Event{ + Time: metav1.Time{Time: baseTime.Add(time.Duration(i) * time.Second)}, + Name: "test", + Type: eventType, + Message: fmt.Sprintf("Test message for %s", eventType), + } + + err = store.Insert(ctx, event) + assert.NoError(t, err) + + found, err := store.Find(ctx, event) + assert.NoError(t, err) + assert.NotNil(t, found) + assert.Equal(t, eventType, found.Type) + } + + // Verify all events can be retrieved + events, err := store.Get(ctx, baseTime.Add(-time.Hour)) + assert.NoError(t, err) + assert.Equal(t, len(validTypes), len(events)) +} + +func TestRetentionPurge(t *testing.T) { + t.Parallel() + + testTableName := "test_table" + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Create store with 10 second retention + store, err := NewStore(dbRW, dbRO, testTableName, 10*time.Second) + assert.NoError(t, err) + defer store.Close() + + baseTime := time.Now().UTC() + events := []components.Event{ + { + Time: metav1.Time{Time: baseTime.Add(-15 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "old_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"old event"}, + }, + }, + { + Time: metav1.Time{Time: baseTime.Add(-5 * time.Second)}, + Name: "test", + Type: common.EventTypeWarning, + ExtraInfo: map[string]string{"id": "new_event"}, + SuggestedActions: &common.SuggestedActions{ + Descriptions: []string{"new event"}, + }, + }, + } + + // Insert events + for _, event := range events { + err = store.Insert(ctx, event) + assert.NoError(t, err) + } + + // Wait for purge to run (retention/10 = 1 second) + time.Sleep(2 * time.Second) + + // Verify only new event remains + remaining, err := store.Get(ctx, baseTime.Add(-20*time.Second)) + assert.NoError(t, err) + assert.Equal(t, 1, len(remaining)) + assert.Equal(t, "new_event", remaining[0].ExtraInfo["id"]) +} diff --git a/components/info/component.go b/components/info/component.go index f88f3684..2d43c06a 100644 --- a/components/info/component.go +++ b/components/info/component.go @@ -13,7 +13,6 @@ import ( "github.com/leptonai/gpud/components" info_id "github.com/leptonai/gpud/components/info/id" - "github.com/leptonai/gpud/components/state" "github.com/leptonai/gpud/log" "github.com/leptonai/gpud/manager" "github.com/leptonai/gpud/pkg/file" @@ -126,7 +125,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { dbSizeHumanized string ) if c.dbRO != nil { - dbSize, err = state.ReadDBSize(ctx, c.dbRO) + dbSize, err = sqlite.ReadDBSize(ctx, c.dbRO) if err != nil { return nil, err } diff --git a/components/state/state.go b/components/state/state.go index 30dc1cb9..d0c17bb9 100644 --- a/components/state/state.go +++ b/components/state/state.go @@ -205,30 +205,8 @@ func Register(reg *prometheus.Registry) error { return nil } -func ReadDBSize(ctx context.Context, db *sql.DB) (uint64, error) { - var pageCount uint64 - err := db.QueryRowContext(ctx, "PRAGMA page_count").Scan(&pageCount) - if err == sql.ErrNoRows { - return 0, errors.New("no page count") - } - if err != nil { - return 0, err - } - - var pageSize uint64 - err = db.QueryRowContext(ctx, "PRAGMA page_size").Scan(&pageSize) - if err == sql.ErrNoRows { - return 0, errors.New("no page size") - } - if err != nil { - return 0, err - } - - return pageCount * pageSize, nil -} - func RecordMetrics(ctx context.Context, db *sql.DB) error { - dbSize, err := ReadDBSize(ctx, db) + dbSize, err := sqlite.ReadDBSize(ctx, db) if err != nil { return err } @@ -236,13 +214,3 @@ func RecordMetrics(ctx context.Context, db *sql.DB) error { return nil } - -func Compact(ctx context.Context, db *sql.DB) error { - log.Logger.Infow("compacting state database") - _, err := db.ExecContext(ctx, "VACUUM;") - if err != nil { - return err - } - log.Logger.Infow("successfully compacted state database") - return nil -} diff --git a/components/state/state_test.go b/components/state/state_test.go index 5306f81b..fbb6c6ac 100644 --- a/components/state/state_test.go +++ b/components/state/state_test.go @@ -71,13 +71,4 @@ func TestRecordMetrics(t *testing.T) { if err := RecordMetrics(ctx, dbRO); err != nil { t.Fatal("failed to record metrics:", err) } - if err := Compact(ctx, dbRW); err != nil { - t.Fatal("failed to compact database:", err) - } - - size, err := ReadDBSize(ctx, dbRO) - if err != nil { - t.Fatal("failed to read db size:", err) - } - t.Log(size) } diff --git a/internal/server/server.go b/internal/server/server.go index 78eceee5..d2288b27 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1204,7 +1204,7 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID ticker.Reset(config.CompactPeriod.Duration) } - if err := state.Compact(ctx, dbRW); err != nil { + if err := sqlite.Compact(ctx, dbRW); err != nil { log.Logger.Errorw("failed to compact state database", "error", err) } } diff --git a/pkg/sqlite/sqlite.go b/pkg/sqlite/sqlite.go index afbae65e..30a68b77 100644 --- a/pkg/sqlite/sqlite.go +++ b/pkg/sqlite/sqlite.go @@ -2,9 +2,13 @@ package sqlite import ( + "context" "database/sql" + "errors" "fmt" + "github.com/leptonai/gpud/log" + _ "github.com/mattn/go-sqlite3" ) @@ -52,3 +56,35 @@ func Open(file string, opts ...OpOption) (*sql.DB, error) { return db, nil } + +func ReadDBSize(ctx context.Context, db *sql.DB) (uint64, error) { + var pageCount uint64 + err := db.QueryRowContext(ctx, "PRAGMA page_count").Scan(&pageCount) + if err == sql.ErrNoRows { + return 0, errors.New("no page count") + } + if err != nil { + return 0, err + } + + var pageSize uint64 + err = db.QueryRowContext(ctx, "PRAGMA page_size").Scan(&pageSize) + if err == sql.ErrNoRows { + return 0, errors.New("no page size") + } + if err != nil { + return 0, err + } + + return pageCount * pageSize, nil +} + +func Compact(ctx context.Context, db *sql.DB) error { + log.Logger.Infow("compacting state database") + _, err := db.ExecContext(ctx, "VACUUM;") + if err != nil { + return err + } + log.Logger.Infow("successfully compacted state database") + return nil +}