Skip to content

Commit

Permalink
Add /debug endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
csfrancis committed Sep 26, 2024
1 parent 6a83520 commit 9c879f8
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)

server, err := proxytv.NewServer(config, provider)
server, err := proxytv.NewServer(config, provider, gitCommit)
if err != nil {
log.Fatalf("failed to create server: %v", err)
}
Expand Down
13 changes: 10 additions & 3 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ type Provider struct {
baseAddress string
filters []*Filter

playlist *playlistLoader
epg *xmltv.TV
epgData []byte
playlist *playlistLoader
epg *xmltv.TV
epgData []byte
lastRefresh time.Time
}

func NewProvider(config *Config) (*Provider, error) {
Expand Down Expand Up @@ -298,6 +299,8 @@ func (p *Provider) Refresh() error {
xmlHeader := []byte("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE tv SYSTEM \"xmltv.dtd\">")
p.epgData = append(xmlHeader, xmlData...)

p.lastRefresh = time.Now()

return nil
}

Expand All @@ -317,3 +320,7 @@ func (p *Provider) GetTrack(idx int) *Track {
}
return &p.playlist.tracks[idx]
}

func (p *Provider) GetLastRefresh() time.Time {
return p.lastRefresh
}
65 changes: 60 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,28 @@ import (
"syscall"
"time"

"runtime"

"sync/atomic"

"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

var startTime = time.Now()

type Server struct {
listenAddress string
router *gin.Engine
server *http.Server
provider *Provider
useFfmpeg bool
streamsSem *semaphore.Weighted
maxStreams int64
activeStreams int64
totalStreams int64
version string
}

func logrusLogFormatter(param gin.LogFormatterParams) string {
Expand All @@ -40,13 +50,17 @@ func logrusLogFormatter(param gin.LogFormatterParams) string {
return ""
}

func NewServer(config *Config, provider *Provider) (*Server, error) {
func NewServer(config *Config, provider *Provider, version string) (*Server, error) {
server := &Server{
listenAddress: config.ListenAddress,
router: gin.New(),
provider: provider,
useFfmpeg: config.UseFFMPEG,
streamsSem: semaphore.NewWeighted(int64(config.MaxStreams)),
maxStreams: int64(config.MaxStreams),
activeStreams: 0,
totalStreams: 0,
version: version,
}

server.router.Use(gin.LoggerWithFormatter(logrusLogFormatter))
Expand All @@ -71,19 +85,21 @@ func (s *Server) getEpgXML() gin.HandlerFunc {
}

func (s *Server) remuxStream(c *gin.Context, track *Track, channelID int) {
var err error

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

if err = s.streamsSem.Acquire(ctx, 1); err != nil {
if err := s.streamsSem.Acquire(ctx, 1); err != nil {
log.WithFields(log.Fields{
"channelId": channelID,
}).Warn("max streams reached")
c.String(429, "Too many requests")
return
}
defer s.streamsSem.Release(1)
atomic.AddInt64(&s.activeStreams, 1)
defer func() {
s.streamsSem.Release(1)
atomic.AddInt64(&s.activeStreams, -1)
}()

logger := log.WithFields(log.Fields{
"url": track.URI.String(),
Expand Down Expand Up @@ -121,6 +137,8 @@ func (s *Server) remuxStream(c *gin.Context, track *Track, channelID int) {
}
}()

atomic.AddInt64(&s.totalStreams, 1)

bytesWritten := int64(0)
continueStream := true
c.Header("Content-Type", `video/mpeg; codecs="avc1.4D401E"`)
Expand Down Expand Up @@ -207,6 +225,42 @@ func (s *Server) streamChannel() gin.HandlerFunc {
}
}

func (s *Server) debug() gin.HandlerFunc {
return func(c *gin.Context) {
var m runtime.MemStats
runtime.ReadMemStats(&m)

numGoroutines := runtime.NumGoroutine()
numCPU := runtime.NumCPU()
activeStreams := atomic.LoadInt64(&s.activeStreams)
totalStreams := atomic.LoadInt64(&s.totalStreams)

metrics := gin.H{
"status": "ok",
"version": s.version,
"system": gin.H{
"memory": gin.H{
"alloc": m.Alloc,
"totalAlloc": m.TotalAlloc,
"sys": m.Sys,
"numGC": m.NumGC,
},
"goroutines": numGoroutines,
"cpus": numCPU,
},
"uptime": time.Since(startTime).String(),
"streams": gin.H{
"active": activeStreams,
"max": s.maxStreams,
"total": totalStreams,
"lastRefresh": s.provider.GetLastRefresh().Format(time.RFC3339),
},
}

c.JSON(200, metrics)
}
}

func (s *Server) Start(p *Provider) chan error {
s.router.GET("/ping", func(c *gin.Context) {
c.String(200, "PONG")
Expand All @@ -216,6 +270,7 @@ func (s *Server) Start(p *Provider) chan error {
s.router.GET("/epg.xml", s.getEpgXML())
s.router.GET("/channel/:channelId", s.streamChannel())
s.router.PUT("/refresh", s.refresh())
s.router.GET("/debug", s.debug())

s.server = &http.Server{
Addr: s.listenAddress,
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestServerStartStop(t *testing.T) {
}

// Create a new server
server, err := NewServer(config, provider)
server, err := NewServer(config, provider, "test")
require.NoError(t, err)

// Start the server
Expand Down

0 comments on commit 9c879f8

Please sign in to comment.