Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(components/db): add common events store (similar to components.Event) #332

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 342 additions & 0 deletions components/db/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
package db

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/sqlite"

_ "github.com/mattn/go-sqlite3"
)

const (
// Event timestamp in unix seconds.
ColumnTimestamp = "timestamp"

// event name
// e.g., "memory_oom", "memory_oom_kill_constraint", "memory_oom_cgroup", "memory_edac_correctable_errors".
ColumnName = "name"

// event type
// e.g., "Unknown", "Info", "Warning", "Critical", "Fatal".
ColumnType = "type"

// event message
// e.g., "VFS file-max limit reached"
ColumnMessage = "message"

// event extra info
// e.g.,
// data source: "dmesg", "nvml", "nvidia-smi".
// event target: "gpu_id", "gpu_uuid".
// log detail: "oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0".
ColumnExtraInfo = "extra_info"

// event suggested actions
// e.g., "reboot"
ColumnSuggestedActions = "suggested_actions"
)

type Event struct {
// Event timestamp in unix seconds.
Timestamp int64 `json:"timestamp"`

// event name
// e.g., "memory_oom", "memory_oom_kill_constraint", "memory_oom_cgroup", "memory_edac_correctable_errors".
Name string `json:"name"`

// event type
// e.g., "Unknown", "Info", "Warning", "Critical", "Fatal".
Type string `json:"type"`

// event message
// e.g., "VFS file-max limit reached"
Message string `json:"message"`

// event extra info
// e.g.,
// data source: "dmesg", "nvml", "nvidia-smi".
// event target: "gpu_id", "gpu_uuid".
// log detail: "oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0".
ExtraInfo string `json:"extra_info"`
gyuho marked this conversation as resolved.
Show resolved Hide resolved

// event suggested actions
// e.g., "reboot"
SuggestedActions string `json:"suggested_actions"`
gyuho marked this conversation as resolved.
Show resolved Hide resolved
gyuho marked this conversation as resolved.
Show resolved Hide resolved
}

type storeImpl struct {
table string
dbRW *sql.DB
dbRO *sql.DB
}

var (
ErrNoDBRWSet = errors.New("no writable db set")
ErrNoDBROSet = errors.New("no read-only db set")
)

// Creates a new DB instance with the table created.
// Requires write-only and read-only instances for minimize conflicting writes/reads.
// ref. https://github.com/mattn/go-sqlite3/issues/1179#issuecomment-1638083995
func NewStore(ctx context.Context, dbRW *sql.DB, dbRO *sql.DB, tableName string) (*storeImpl, error) {
if dbRW == nil {
return nil, ErrNoDBRWSet
}
if dbRO == nil {
return nil, ErrNoDBROSet
}
gyuho marked this conversation as resolved.
Show resolved Hide resolved

if err := createTable(ctx, dbRW, tableName); err != nil {
return nil, err
}
return &storeImpl{
table: tableName,
dbRW: dbRW,
dbRO: dbRO,
}, nil
}

func (s *storeImpl) Insert(ctx context.Context, ev Event) error {
return insertEvent(ctx, s.dbRW, s.table, ev)
}

func (s *storeImpl) Find(ctx context.Context, ev Event) (*Event, error) {
return findEvent(ctx, s.dbRO, s.table, ev)
}

// Returns the event in the descending order of timestamp (latest event first).
func (s *storeImpl) Get(ctx context.Context, since time.Time) ([]Event, error) {
return getEvents(ctx, s.dbRO, s.table, since)
}

func (s *storeImpl) Purge(ctx context.Context, beforeTimestamp int64) (int, error) {
return purgeEvents(ctx, s.dbRW, s.table, beforeTimestamp)
}

func createTable(ctx context.Context, db *sql.DB, tableName string) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}

Check warning on line 125 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L124-L125

Added lines #L124 - L125 were not covered by tests

// create table
_, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
%s INTEGER NOT NULL,
%s TEXT NOT NULL,
%s TEXT NOT NULL,
%s TEXT,
%s TEXT,
%s TEXT
);`, tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnMessage,
ColumnExtraInfo,
ColumnSuggestedActions,
))
if err != nil {
_ = tx.Rollback()
return err
}

// create index on timestamp column
_, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_%s ON %s(%s);`,
tableName, ColumnTimestamp, tableName, ColumnTimestamp))
if err != nil {
_ = tx.Rollback()
return err
}

Check warning on line 154 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L152-L154

Added lines #L152 - L154 were not covered by tests

return tx.Commit()
}

func insertEvent(ctx context.Context, db *sql.DB, tableName string, ev Event) error {
start := time.Now()
_, err := db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (%s, %s, %s, %s, %s, %s) VALUES (?, ?, ?, NULLIF(?, ''), NULLIF(?, ''), NULLIF(?, ''))",
tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnMessage,
ColumnExtraInfo,
ColumnSuggestedActions,
),
ev.Timestamp,
ev.Name,
ev.Type,
ev.Message,
ev.ExtraInfo,
ev.SuggestedActions,
)
sqlite.RecordInsertUpdate(time.Since(start).Seconds())

return err
}

func findEvent(ctx context.Context, db *sql.DB, tableName string, ev Event) (*Event, error) {
selectStatement := fmt.Sprintf(`
SELECT %s, %s, %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ?`,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnMessage,
ColumnExtraInfo,
ColumnSuggestedActions,
tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
)
if ev.Message != "" {
selectStatement += fmt.Sprintf(" AND %s = ?", ColumnMessage)
}
if ev.ExtraInfo != "" {
selectStatement += fmt.Sprintf(" AND %s = ?", ColumnExtraInfo)
}
if ev.SuggestedActions != "" {
selectStatement += fmt.Sprintf(" AND %s = ?", ColumnSuggestedActions)
}

params := []any{ev.Timestamp, ev.Name, ev.Type}
if ev.Message != "" {
params = append(params, ev.Message)
}
if ev.ExtraInfo != "" {
params = append(params, ev.ExtraInfo)
}
if ev.SuggestedActions != "" {
params = append(params, ev.SuggestedActions)
}

row := db.QueryRowContext(ctx, selectStatement, params...)

foundEvent, err := scanRow(row)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}

return &foundEvent, nil
}

// Returns the event in the descending order of timestamp (latest event first).
func getEvents(ctx context.Context, db *sql.DB, tableName string, since time.Time) ([]Event, error) {
query := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s, %s
FROM %s
WHERE %s > ?
ORDER BY %s DESC`,
ColumnTimestamp, ColumnName, ColumnType, ColumnMessage, ColumnExtraInfo, ColumnSuggestedActions,
tableName,
ColumnTimestamp,
ColumnTimestamp,
)
params := []any{since.UTC().Unix()}

start := time.Now()
rows, err := db.QueryContext(ctx, query, params...)
sqlite.RecordSelect(time.Since(start).Seconds())

if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}

Check warning on line 250 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L249-L250

Added lines #L249 - L250 were not covered by tests
return nil, err
}
defer rows.Close()

events := []Event{}
for rows.Next() {
event, err := scanRows(rows)
if err != nil {
return nil, err
}

Check warning on line 260 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L259-L260

Added lines #L259 - L260 were not covered by tests
events = append(events, event)
}
if len(events) == 0 {
return nil, nil
}
return events, nil
}

func scanRow(row *sql.Row) (Event, error) {
var event Event
var msg sql.NullString
var extraInfo sql.NullString
var suggestedActions sql.NullString
err := row.Scan(
&event.Timestamp,
&event.Name,
&event.Type,
&msg,
&extraInfo,
&suggestedActions,
)
if err == nil {
if msg.Valid {
event.Message = msg.String
}
if extraInfo.Valid {
event.ExtraInfo = extraInfo.String
}
if suggestedActions.Valid {
event.SuggestedActions = suggestedActions.String
}
}
return event, err
}

func scanRows(rows *sql.Rows) (Event, error) {
var event Event
var msg sql.NullString
var extraInfo sql.NullString
var suggestedActions sql.NullString
err := rows.Scan(
&event.Timestamp,
&event.Name,
&event.Type,
&msg,
&extraInfo,
&suggestedActions,
)
if err == nil {
if msg.Valid {
event.Message = msg.String
}
if extraInfo.Valid {
event.ExtraInfo = extraInfo.String
}
if suggestedActions.Valid {
event.SuggestedActions = suggestedActions.String
}
}
return event, err
}

func purgeEvents(ctx context.Context, db *sql.DB, tableName string, beforeTimestamp int64) (int, error) {
log.Logger.Debugw("purging events")
deleteStatement := fmt.Sprintf(`DELETE FROM %s WHERE %s < ?`,
tableName,
ColumnTimestamp,
)

start := time.Now()
rs, err := db.ExecContext(ctx, deleteStatement, beforeTimestamp)
if err != nil {
return 0, err
}

Check warning on line 334 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L333-L334

Added lines #L333 - L334 were not covered by tests
sqlite.RecordDelete(time.Since(start).Seconds())

affected, err := rs.RowsAffected()
if err != nil {
return 0, err
}

Check warning on line 340 in components/db/events.go

View check run for this annotation

Codecov / codecov/patch

components/db/events.go#L339-L340

Added lines #L339 - L340 were not covered by tests
return int(affected), nil
}
Loading
Loading