Skip to content

Commit

Permalink
feat(components/db): add common events store
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 24, 2025
1 parent e135b52 commit aebe388
Show file tree
Hide file tree
Showing 2 changed files with 1,086 additions and 0 deletions.
263 changes: 263 additions & 0 deletions components/db/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package db

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/sqlite"

_ "github.com/mattn/go-sqlite3"
)

const (
// Event timestamp in unix seconds.
ColumnTimestamp = "timestamp"

// event name
// e.g., "memory_oom", "memory_oom_kill_constraint", "memory_oom_cgroup", "memory_edac_correctable_errors".
ColumnName = "name"

// event type
// e.g., "Unknown", "Info", "Warning", "Critical", "Fatal".
ColumnType = "type"

// event extra info
// e.g.,
// data source: "dmesg", "nvml", "nvidia-smi".
// event target: "gpu_id", "gpu_uuid".
// log detail: "oom_reaper: reaped process 345646 (vector), now anon-rss:0kB, file-rss:0kB, shmem-rss:0".
ColumnExtraInfo = "extra_info"

// event suggested actions
// e.g., "reboot"
ColumnSuggestedActions = "suggested_actions"
)

type Event struct {
Timestamp int64 `json:"timestamp"`
Name string `json:"name"`
Type string `json:"type"`
ExtraInfo string `json:"extra_info"`
SuggestedActions string `json:"suggested_actions"`
}

type storeImpl struct {
table string
dbRW *sql.DB
dbRO *sql.DB
}

// Creates a new DB instance with the table created.
func NewStore(ctx context.Context, dbRW *sql.DB, dbRO *sql.DB, tableName string) (*storeImpl, error) {
if err := createTable(ctx, dbRW, tableName); err != nil {
return nil, err
}
return &storeImpl{
table: tableName,
dbRW: dbRW,
dbRO: dbRO,
}, nil
}

func (s *storeImpl) Insert(ctx context.Context, ev Event) error {
return insertEvent(ctx, s.dbRW, s.table, ev)
}

func (s *storeImpl) Find(ctx context.Context, ev Event) (*Event, error) {
return findEvent(ctx, s.dbRO, s.table, ev)
}

func (s *storeImpl) Get(ctx context.Context, since time.Time) ([]Event, error) {
return getEvents(ctx, s.dbRO, s.table, since)
}

func (s *storeImpl) Purge(ctx context.Context, beforeTimestamp int64) (int, error) {
return purgeEvents(ctx, s.dbRW, s.table, beforeTimestamp)
}

func createTable(ctx context.Context, db *sql.DB, tableName string) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

Check failure on line 87 in components/db/events.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `tx.Rollback` is not checked (errcheck)

// create table
_, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
%s INTEGER NOT NULL,
%s TEXT NOT NULL,
%s TEXT NOT NULL,
%s TEXT,
%s TEXT
);`, tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnExtraInfo,
ColumnSuggestedActions,
))
if err != nil {
return err
}

// create index on timestamp column
_, err = tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS idx_%s_%s ON %s(%s);`,
tableName, ColumnTimestamp, tableName, ColumnTimestamp))
if err != nil {
return err
}

return tx.Commit()
}

func insertEvent(ctx context.Context, db *sql.DB, tableName string, ev Event) error {
start := time.Now()
_, err := db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (%s, %s, %s, %s, %s) VALUES (?, ?, ?, NULLIF(?, ''), NULLIF(?, ''))",
tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnExtraInfo,
ColumnSuggestedActions,
),
ev.Timestamp,
ev.Name,
ev.Type,
ev.ExtraInfo,
ev.SuggestedActions,
)
sqlite.RecordInsertUpdate(time.Since(start).Seconds())

return err
}

func findEvent(ctx context.Context, db *sql.DB, tableName string, ev Event) (*Event, error) {
selectStatement := fmt.Sprintf(`
SELECT %s, %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ?`,
ColumnTimestamp,
ColumnName,
ColumnType,
ColumnExtraInfo,
ColumnSuggestedActions,
tableName,
ColumnTimestamp,
ColumnName,
ColumnType,
)
if ev.ExtraInfo != "" {
selectStatement += fmt.Sprintf(" AND %s = ?", ColumnExtraInfo)
}
if ev.SuggestedActions != "" {
selectStatement += fmt.Sprintf(" AND %s = ?", ColumnSuggestedActions)
}

params := []any{ev.Timestamp, ev.Name, ev.Type}
if ev.ExtraInfo != "" {
params = append(params, ev.ExtraInfo)
}
if ev.SuggestedActions != "" {
params = append(params, ev.SuggestedActions)
}

var foundEvent Event
var extraInfo sql.NullString
var suggestedActions sql.NullString
if err := db.QueryRowContext(ctx, selectStatement, params...).Scan(
&foundEvent.Timestamp,
&foundEvent.Name,
&foundEvent.Type,
&extraInfo,
&suggestedActions,
); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
if extraInfo.Valid {
foundEvent.ExtraInfo = extraInfo.String
}
if suggestedActions.Valid {
foundEvent.SuggestedActions = suggestedActions.String
}

return &foundEvent, nil
}

// Returns the event in the descending order of timestamp (latest event first).
func getEvents(ctx context.Context, db *sql.DB, tableName string, since time.Time) ([]Event, error) {
query := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s
FROM %s
WHERE %s > ?
ORDER BY %s DESC`,
ColumnTimestamp, ColumnName, ColumnType, ColumnExtraInfo, ColumnSuggestedActions,
tableName,
ColumnTimestamp,
ColumnTimestamp,
)
params := []any{since.UTC().Unix()}

start := time.Now()
rows, err := db.QueryContext(ctx, query, params...)
sqlite.RecordSelect(time.Since(start).Seconds())

if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
defer rows.Close()

events := []Event{}
for rows.Next() {
var event Event
var extraInfo sql.NullString
var suggestedActions sql.NullString
if err := rows.Scan(
&event.Timestamp,
&event.Name,
&event.Type,
&extraInfo,
&suggestedActions,
); err != nil {
return nil, err
}
if extraInfo.Valid {
event.ExtraInfo = extraInfo.String
}
if suggestedActions.Valid {
event.SuggestedActions = suggestedActions.String
}
events = append(events, event)
}
if len(events) == 0 {
return nil, nil
}
return events, nil
}

func purgeEvents(ctx context.Context, db *sql.DB, tableName string, beforeTimestamp int64) (int, error) {
log.Logger.Debugw("purging events")
deleteStatement := fmt.Sprintf(`DELETE FROM %s WHERE %s < ?`,
tableName,
ColumnTimestamp,
)

start := time.Now()
rs, err := db.ExecContext(ctx, deleteStatement, beforeTimestamp)
if err != nil {
return 0, err
}
sqlite.RecordDelete(time.Since(start).Seconds())

affected, err := rs.RowsAffected()
if err != nil {
return 0, err
}
return int(affected), nil
}
Loading

0 comments on commit aebe388

Please sign in to comment.