Skip to content

Commit

Permalink
Reduce mist client timeout for catabalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 24, 2025
1 parent 35bd6f5 commit e500951
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 36 deletions.
28 changes: 4 additions & 24 deletions balancer/catabalancer/catalyst_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,41 +411,21 @@ func (c *CataBalancer) MistUtilLoadSource(ctx context.Context, streamID, lat, lo
return "", fmt.Errorf("catabalancer no node found for ingest stream: %s stale: false", streamID)
}

var updateNodeStatsEvery = 5 * time.Second
var StatsUpdateInterval = 5 * time.Second

func isStale(timestamp time.Time, stale time.Duration) bool {
return time.Since(timestamp) >= stale
}

func StartMetricSending(nodeName string, latitude float64, longitude float64, mist clients.MistAPIClient, nodeStatsDB *sql.DB) {
ticker := time.NewTicker(updateNodeStatsEvery)
ticker := time.NewTicker(StatsUpdateInterval)
go func() {
for range ticker.C {
sendWithTimeout(nodeName, latitude, longitude, mist, nodeStatsDB)
sendMetrics(nodeName, latitude, longitude, mist, nodeStatsDB)
}
}()
}

func sendWithTimeout(nodeName string, latitude float64, longitude float64, mist clients.MistAPIClient, nodeStatsDB *sql.DB) {
ctx, cancel := context.WithTimeout(context.Background(), updateNodeStatsEvery)
defer cancel()

done := make(chan struct{})

go func() {
sendMetrics(nodeName, latitude, longitude, mist, nodeStatsDB)
close(done) // Signal completion
}()

// Wait for either the function to complete or timeout
select {
case <-done:
return
case <-ctx.Done():
log.LogNoRequestID("catabalancer send metrics timed out")
}
}

func sendMetrics(nodeName string, latitude float64, longitude float64, mist clients.MistAPIClient, nodeStatsDB *sql.DB) {
start := time.Now()
sysusage, err := GetSystemUsage()
Expand Down Expand Up @@ -498,7 +478,7 @@ func sendMetrics(nodeName string, latitude float64, longitude float64, mist clie

func sendMetricsToDB(nodeStatsDB *sql.DB, nodeName string, payload []byte) {
start := time.Now()
queryContext, cancel := context.WithTimeout(context.Background(), updateNodeStatsEvery)
queryContext, cancel := context.WithTimeout(context.Background(), StatsUpdateInterval)
defer cancel()
insertStatement := `insert into "node_stats"(
"node_id",
Expand Down
25 changes: 14 additions & 11 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,21 @@ type MistClient struct {
HttpReqUrl string
configMu sync.Mutex
cache *cache.Cache
httpClient *http.Client
}

func NewMistAPIClient(user, password, host string, port int) MistAPIClient {
const MistClientTimeout = 1 * time.Minute

func NewMistAPIClient(user, password, host string, port int, timeout time.Duration) MistAPIClient {
if timeout == 0 {
timeout = MistClientTimeout
}
mist := &MistClient{
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
httpClient: newRetryableClient(&http.Client{Timeout: timeout}),
}
return mist
}
Expand Down Expand Up @@ -238,10 +245,6 @@ type MistPushStats struct {
Tracks []int `json:"tracks"`
}

const MIST_CLIENT_TIMEOUT = 1 * time.Minute

var mistRetryableClient = newRetryableClient(&http.Client{Timeout: MIST_CLIENT_TIMEOUT})

func (mc *MistClient) AddStream(streamName, sourceUrl string) error {
c := commandAddStream(streamName, sourceUrl)
return wrapErr(validateAddStream(mc.sendCommand(c)), streamName)
Expand Down Expand Up @@ -405,7 +408,7 @@ func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) {
return "", err
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req)
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mc.httpClient, req)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -437,7 +440,7 @@ func (mc *MistClient) sendHttpRequest(streamName string) (string, error) {
return "", err
}
req.Header.Add("Content-Type", "application/json")
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req)
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mc.httpClient, req)
if err != nil {
return "", err
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func main() {
}

if cli.MistEnabled {
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort)
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, 0)
}

catabalancerEnabled := balancer.CombinedBalancerEnabled(cli.CataBalancer)
Expand Down Expand Up @@ -267,6 +267,7 @@ func main() {

if catabalancerEnabled && nodeStatsDB != nil {
if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes
mist := clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, catabalancer.StatsUpdateInterval-time.Second)
catabalancer.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, mist, nodeStatsDB)
}
}
Expand Down

0 comments on commit e500951

Please sign in to comment.