-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ai: add ai-stream-status endpoint & client #198
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package ai | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"fmt" | ||
"math" | ||
"strings" | ||
|
||
"github.com/ClickHouse/clickhouse-go/v2" | ||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" | ||
"github.com/Masterminds/squirrel" | ||
) | ||
|
||
const maxClickhouseResultRows = 1000 | ||
|
||
type AIStreamStatusEventRow struct { | ||
StreamID string `ch:"stream_id"` | ||
AvgInputFPS float64 `ch:"avg_input_fps"` | ||
AvgOutputFPS float64 `ch:"avg_output_fps"` | ||
ErrorCount uint64 `ch:"error_count"` | ||
Errors []string `ch:"errors"` | ||
TotalRestarts uint64 `ch:"total_restarts"` | ||
RestartLogs []string `ch:"restart_logs"` | ||
} | ||
|
||
type Clickhouse interface { | ||
QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) | ||
} | ||
|
||
type ClickhouseOptions struct { | ||
Addr string | ||
User string | ||
Password string | ||
Database string | ||
} | ||
|
||
type ClickhouseClient struct { | ||
conn driver.Conn | ||
} | ||
|
||
func NewClickhouseConn(opts ClickhouseOptions) (*ClickhouseClient, error) { | ||
conn, err := clickhouse.Open(&clickhouse.Options{ | ||
Addr: strings.Split(opts.Addr, ","), | ||
Auth: clickhouse.Auth{ | ||
Database: opts.Database, | ||
Username: opts.User, | ||
Password: opts.Password, | ||
}, | ||
TLS: &tls.Config{}, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &ClickhouseClient{conn: conn}, nil | ||
} | ||
|
||
func (c *ClickhouseClient) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) { | ||
sql, args, err := buildAIStreamStatusEventsQuery(spec) | ||
if err != nil { | ||
return nil, fmt.Errorf("error building AI stream status events query: %w", err) | ||
} | ||
var res []AIStreamStatusEventRow | ||
err = c.conn.Select(ctx, &res, sql, args...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
res = replaceNaN(res) | ||
return res, nil | ||
} | ||
|
||
func buildAIStreamStatusEventsQuery(spec QuerySpec) (string, []interface{}, error) { | ||
query := squirrel.Select( | ||
"stream_id", | ||
"avg(input_fps) as avg_input_fps", | ||
"avg(output_fps) as avg_output_fps", | ||
"countIf(last_error != '') as error_count", | ||
"arrayFilter(x -> x != '', groupUniqArray(last_error)) as errors", | ||
"sum(restart_count) as total_restarts", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also incorrect. |
||
"arrayFilter(x -> x != '', groupUniqArray(last_restart_logs)) as restart_logs"). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe only keep the last one as well? I think it would be confusing the merge+uniq these logs. |
||
From("stream_status"). | ||
GroupBy("stream_id"). | ||
Limit(maxClickhouseResultRows + 1) | ||
|
||
if spec.Filter.StreamID != "" { | ||
query = query.Where("stream_id = ?", spec.Filter.StreamID) | ||
} | ||
|
||
if spec.From != nil { | ||
query = query.Where("timestamp_ts > ?", spec.From) | ||
} | ||
|
||
if spec.To != nil { | ||
query = query.Where("timestamp_ts < ?", spec.To) | ||
} | ||
|
||
sql, args, err := query.ToSql() | ||
if err != nil { | ||
return "", nil, err | ||
} | ||
|
||
return sql, args, nil | ||
} | ||
|
||
func replaceNaN(rows []AIStreamStatusEventRow) []AIStreamStatusEventRow { | ||
var res []AIStreamStatusEventRow | ||
for _, r := range rows { | ||
if math.IsNaN(r.AvgInputFPS) { | ||
r.AvgInputFPS = 0.0 | ||
} | ||
if math.IsNaN(r.AvgOutputFPS) { | ||
r.AvgOutputFPS = 0.0 | ||
} | ||
res = append(res, r) | ||
} | ||
return res | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package ai | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
livepeer "github.com/livepeer/go-api-client" | ||
"github.com/livepeer/livepeer-data/pkg/data" | ||
promClient "github.com/prometheus/client_golang/api" | ||
) | ||
|
||
var ErrAssetNotFound = errors.New("asset not found") | ||
|
||
type StreamStatus struct { | ||
StreamID string `json:"streamId"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use |
||
AvgInputFPS data.Nullable[float64] `json:"avgInputFps"` | ||
AvgOutputFPS data.Nullable[float64] `json:"avgOutputFps"` | ||
ErrorCount uint64 `json:"errorCount"` | ||
Errors []string `json:"errors"` | ||
TotalRestarts uint64 `json:"totalRestarts"` | ||
RestartLogs []string `json:"restartLogs"` | ||
} | ||
|
||
type ClientOptions struct { | ||
Prometheus promClient.Config | ||
Livepeer livepeer.ClientOptions | ||
ClickhouseOptions | ||
} | ||
|
||
type Client struct { | ||
opts ClientOptions | ||
lp *livepeer.Client | ||
clickhouse Clickhouse | ||
} | ||
|
||
func NewClient(opts ClientOptions) (*Client, error) { | ||
lp := livepeer.NewAPIClient(opts.Livepeer) | ||
|
||
clickhouse, err := NewClickhouseConn(opts.ClickhouseOptions) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating clickhouse client: %w", err) | ||
} | ||
|
||
return &Client{opts, lp, clickhouse}, nil | ||
} | ||
|
||
func (c *Client) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]StreamStatus, error) { | ||
rows, err := c.clickhouse.QueryAIStreamStatusEvents(ctx, spec) | ||
if err != nil { | ||
return nil, err | ||
} | ||
metrics := aiStreamStatusEventsToStreamStatuses(rows, spec) | ||
return metrics, nil | ||
} | ||
|
||
func aiStreamStatusEventsToStreamStatuses(rows []AIStreamStatusEventRow, spec QuerySpec) []StreamStatus { | ||
streamStatuses := make([]StreamStatus, len(rows)) | ||
for i, row := range rows { | ||
streamStatuses[i] = StreamStatus{ | ||
StreamID: row.StreamID, | ||
AvgInputFPS: data.WrapNullable(row.AvgInputFPS), | ||
AvgOutputFPS: data.WrapNullable(row.AvgOutputFPS), | ||
ErrorCount: row.ErrorCount, | ||
Errors: row.Errors, | ||
TotalRestarts: row.TotalRestarts, | ||
RestartLogs: row.RestartLogs, | ||
} | ||
} | ||
return streamStatuses | ||
} | ||
|
||
func toFloat64Ptr(f float64, asked bool) data.Nullable[float64] { | ||
return data.ToNullable(f, true, asked) | ||
} | ||
|
||
func toStringPtr(s string, asked bool) data.Nullable[string] { | ||
return data.ToNullable(s, true, asked) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package ai | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type MockClickhouseClient struct { | ||
rows []AIStreamStatusEventRow | ||
} | ||
|
||
func (m MockClickhouseClient) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) { | ||
return m.rows, nil | ||
} | ||
|
||
func TestQueryAIStreamStatusEvents(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
require := require.New(t) | ||
|
||
tests := []struct { | ||
name string | ||
spec QuerySpec | ||
rows []AIStreamStatusEventRow | ||
expJson string | ||
}{ | ||
{ | ||
name: "basic query with no errors", | ||
rows: []AIStreamStatusEventRow{ | ||
{ | ||
StreamID: "stream-1", | ||
AvgInputFPS: 30.0, | ||
AvgOutputFPS: 25.0, | ||
ErrorCount: 0, | ||
Errors: []string{}, | ||
TotalRestarts: 1, | ||
RestartLogs: []string{"restart-log-1"}, | ||
}, | ||
}, | ||
expJson: ` | ||
[ | ||
{ | ||
"streamId": "stream-1", | ||
"avgInputFps": 30.0, | ||
"avgOutputFps": 25.0, | ||
"errorCount": 0, | ||
"errors": [], | ||
"totalRestarts": 1, | ||
"restartLogs": ["restart-log-1"] | ||
} | ||
] | ||
`, | ||
}, | ||
{ | ||
name: "query with errors", | ||
rows: []AIStreamStatusEventRow{ | ||
{ | ||
StreamID: "stream-2", | ||
AvgInputFPS: 20.0, | ||
AvgOutputFPS: 15.0, | ||
ErrorCount: 2, | ||
Errors: []string{"error-1", "error-2"}, | ||
TotalRestarts: 3, | ||
RestartLogs: []string{"restart-log-2", "restart-log-3"}, | ||
}, | ||
}, | ||
expJson: ` | ||
[ | ||
{ | ||
"streamId": "stream-2", | ||
"avgInputFps": 20.0, | ||
"avgOutputFps": 15.0, | ||
"errorCount": 2, | ||
"errors": ["error-1", "error-2"], | ||
"totalRestarts": 3, | ||
"restartLogs": ["restart-log-2", "restart-log-3"] | ||
} | ||
] | ||
`, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
// given | ||
mockClickhouse := MockClickhouseClient{rows: tt.rows} | ||
client := Client{clickhouse: &mockClickhouse} | ||
|
||
// when | ||
res, err := client.QueryAIStreamStatusEvents(context.Background(), tt.spec) | ||
|
||
// then | ||
require.NoError(err) | ||
jsonRes, err := json.Marshal(res) | ||
require.NoError(err) | ||
require.JSONEq(tt.expJson, string(jsonRes)) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package ai | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
type QueryFilter struct { | ||
StreamID string | ||
} | ||
|
||
type QuerySpec struct { | ||
From, To *time.Time | ||
Filter QueryFilter | ||
} | ||
|
||
func NewQuerySpec(streamID string, from, to *time.Time) QuerySpec { | ||
return QuerySpec{ | ||
From: from, | ||
To: to, | ||
Filter: QueryFilter{ | ||
StreamID: streamID, | ||
}, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an entirely correct aggregation, the right way would be aggregating the error events. How hard would that be? I expected the data pipeline itself to be doing that kind of aggregation though, so we're not scanning the table at query time.