Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor the app to be more idiomatic #1

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ docker compose build
Необязательные:

* `DEBUG` (false) - включение режима отладки
* `CHECK` (true) - включение проверки Icecast
* `SKIP_CHECK` (false) - отключение проверки статуса Icecast
* `CHECK_URL` (http://icecast:8000/status-json.xsl) - URL проверки
* `CHECK_INTERVAL` (60) - интервал проверки
* `CHECK_INTERVAL` (60s) - интервал проверки
* `CHECK_TIMEOUT` (5s) - таймаут проверки
* `STREAM_URL` (https://stream.radio-t.com) - URL потока вещания
* `FFMPEG_PATH` (/usr/bin/ffmpeg) - путь до ffmpeg
* `TG_SERVER` (dc4-1.rtmp.t.me) - адрес сервера Telegram для приема потока. Выдается при старте вещания в Telegram

```bash
Expand All @@ -44,7 +46,9 @@ docker compose up -d

## Без контейнера

1. Установить `ffmpeg` и `nushell`
1. Установить `ffmpeg`
2. Создать чат в Telegram.
3. Запустить в чате аудио-видео звонок в режиме стрима.
4. `TG_KEY=111:AAA nu ./entrypoint.nu`
4. Собрать исполняемый файл. `go build`
5.a Запустить `TG_KEY=111:AAA ./tg-retrans`
5.b Или запустить `TG_KEY=111:AAA nu ./entrypoint.nu`
12 changes: 8 additions & 4 deletions docker-compose.yml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ services:
environment:
# Optional. Enable debug info. Default: false
#- DEBUG=false
# Optional. Enable check. Default: false
#- CHECK=false
# Optional. Disable icecast status check. Default: false
#- SKIP_CHECK=false
# Optional. Configure check URL. Default: http://icecast:8000/status-json.xsl
#- CHECK_URL=http://icecast:8000/status-json.xsl
# Optional. Configure check interval in seconds. Default: 60
#- CHECK_INTERVAL=60
# Optional. Configure check interval in seconds. Default: 60s
#- CHECK_INTERVAL=60s
# Optional. Configure check timeout in seconds. Default: 5s
#- CHECK_INTERVAL=5s
# Optional. Configure source stream URL. Default: https://stream.radio-t.com
#- STREAM_URL=https://stream.radio-t.com
# Optional. Path to ffmpeg binary. Default: /usr/bin/ffmpeg
#- FFMPEG_PATH=/usr/bin/ffmpeg
# Optional. Configure name of Telegram streaming host. Default: dc4-1.rtmp.t.me
#- TG_SERVER=dc4-1.rtmp.t.me
# Mandatory. Configure key for Telegram chat audio-video conference call.
Expand Down
221 changes: 139 additions & 82 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,123 +1,180 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"syscall"
"time"

"github.com/go-pkgz/lgr"
"github.com/umputun/go-flags"
)

type Options struct {
Check bool `long:"check" env:"CHECK" description:"Enable status check"`
var opts struct {
CheckURL string `long:"check-url" env:"CHECK_URL" default:"http://icecast:8000/status-json.xsl" description:"URL to check the stream status"`
CheckInterval time.Duration `long:"check-interval" env:"CHECK_INTERVAL" default:"60s" description:"Interval for status checks (in seconds)"`
StreamURL string `long:"stream-url" env:"STREAM_URL" default:"https://stream.radio-t.com" description:"Source stream URL"`
TGServer string `long:"tg-server" env:"TG_SERVER" default:"dc4-1.rtmp.t.me" description:"Telegram server"`
TGKey string `long:"tg-key" env:"TG_KEY" required:"true" description:"Telegram stream key"`
Debug bool `long:"debug" env:"DEBUG" description:"Enable debug mode"`
CheckInterval time.Duration `long:"check-interval" env:"CHECK_INTERVAL" default:"60s" description:"Interval for status checks"`
CheckTimeout time.Duration `long:"check-timeout" env:"CHECK_TIMEOUT" default:"5s" description:"Timeout for status check"`

StreamURL string `long:"stream-url" env:"STREAM_URL" default:"https://stream.radio-t.com" description:"Source stream URL"`
FfmpegPath string `long:"ffmpeg-path" env:"FFMPEG_PATH" default:"/usr/bin/ffmpeg" description:"Path to ffmpeg binary"`
SkipCheck bool `long:"skip-check" env:"SKIP_CHECK" description:"Disable status check"`
TGServer string `long:"tg-server" env:"TG_SERVER" default:"dc4-1.rtmp.t.me" description:"Telegram server"`
TGKey string `long:"tg-key" env:"TG_KEY" required:"true" description:"Telegram stream key"`
Debug bool `long:"debug" env:"DEBUG" description:"Enable debug mode"`
}

var (
opts Options
log = lgr.New(lgr.Msec)
)
var revision = "unknown"

func main() {
fmt.Printf("tg-retrans, %s\n", revision)
if _, err := flags.Parse(&opts); err != nil {
log.Printf("[ERROR] failed to parse flags: %v", err)
os.Exit(2)
}
setupLog(opts.Debug)

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

if err := run(ctx); err != nil {
log.Fatalf("[ERROR] run failed: %v", err)
}
log.Printf("[INFO] completed")
}

func checkStatus() {
status := false
for !status {
log.Logf("[DEBUG] Checking: %s", opts.CheckURL)
resp, err := http.Get(opts.CheckURL)
if err != nil {
log.Logf("[DEBUG] FAIL: %v", err)
time.Sleep(opts.CheckInterval)
continue
// run is the main function that starts the retranslation process
// it is one-shot if SkipCheck is set, otherwise it runs in a loop with a check interval
func run(ctx context.Context) error {
if opts.SkipCheck {
if err := startRetrans(ctx); err != nil {
return fmt.Errorf("failed to start retranslation: %w", err)
}
defer resp.Body.Close()

var data map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err == nil {
if icestats, ok := data["icestats"].(map[string]interface{}); ok {
if sources, ok := icestats["source"]; ok && sources != nil {
log.Logf("[DEBUG] SUCCESS")
status = true
continue
}
}
if !checkStreamStatus(ctx) {
return fmt.Errorf("stream is not available")
}
return nil
}

for {
// cancel the context if the parent is done
select {
case <-ctx.Done():
return ctx.Err()
default:
}

log.Logf("[DEBUG] FAIL: invalid data")
log.Logf("[DEBUG] Next check in %s", opts.CheckInterval)
if checkStreamStatus(ctx) {
log.Print("[INFO] Stream is available, start retranslation")
if err := startRetrans(ctx); err != nil {
log.Printf("[WARN] failed to start retranslation: %v", err)
}
} else {
log.Printf("[DEBUG] Not streaming, next check in %v", opts.CheckInterval)
}
time.Sleep(opts.CheckInterval)
}
}

func startWork() {
destURL := fmt.Sprintf("rtmps://%s/s/%s", opts.TGServer, opts.TGKey)
// checkStreamStatus checks if the stream is available
func checkStreamStatus(ctx context.Context) bool {
log.Printf("[DEBUG] Checking stream with %s", opts.CheckURL)
client := http.Client{Timeout: opts.CheckTimeout}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, opts.CheckURL, http.NoBody)
if err != nil {
log.Printf("[WARN] Can't make request to %s: %v", opts.CheckURL, err)
return false
}
resp, err := client.Do(req)
if err != nil {
log.Printf("[WARN] Can't get response from %s: %v", opts.CheckURL, err)
return false
}
defer resp.Body.Close()

log.Logf("[INFO] Start retranslation")
log.Logf("[INFO] Source: %s", opts.StreamURL)
log.Logf("[INFO] Destination: %s", destURL)

runOpts := []string{
"-v", "verbose",
"-nostdin",
"-nostats",
"-hide_banner",
"-loop", "1",
"-i", "logo-dark.png",
"-i", opts.StreamURL,
"-c:v", "libx264",
"-tune", "stillimage",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-b:a", "128k",
"-ac", "1",
"-ar", "44100",
"-f", "flv",
"-rtmp_live", "-1",
destURL,
if resp.StatusCode != http.StatusOK {
log.Printf("[DEBUG] Invalid status code for %s: %d", opts.CheckURL, resp.StatusCode)
return false
}

log.Logf("[DEBUG] Run options: %v", runOpts)
data := map[string]any{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
log.Printf("[WARN] Failed to decode response: %v", err)
return false
}

cmd := exec.Command("/usr/bin/ffmpeg", runOpts...)
if opts.Debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout
} else {
cmd.Stdout = nil
cmd.Stderr = nil
icestats, ok := data["icestats"].(map[string]any)
if !ok {
log.Printf("[WARN] Missing icestats key in response")
return false
}

if err := cmd.Run(); err != nil {
log.Logf("[ERROR] Failed to run ffmpeg: %v", err)
if sources, ok := icestats["source"]; !ok || sources == nil {
log.Printf("[WARN] Missing or empty source in icestats response")
return false
}

log.Logf("[INFO] End retranslation")
log.Printf("[DEBUG] Status check passed")
return true
}

func main() {
parser := flags.NewParser(&opts, flags.Default)
if _, err := parser.Parse(); err != nil {
log.Logf("[ERROR] Can't read params")
os.Exit(1)
func startRetrans(ctx context.Context) error {
// spawnFFmpeg creates and runs ffmpeg process
spawnFFmpeg := func(destURL string) error {
runOpts := []string{
"-v", "verbose",
"-nostdin",
"-nostats",
"-hide_banner",
"-loop", "1",
"-i", "logo-dark.png",
"-i", opts.StreamURL,
"-c:v", "libx264",
"-tune", "stillimage",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-b:a", "128k",
"-ac", "1",
"-ar", "44100",
"-f", "flv",
"-rtmp_live", "-1",
destURL,
}

log.Printf("[DEBUG] Run options: %v", runOpts)
cmd := exec.CommandContext(ctx, opts.FfmpegPath, runOpts...)

if opts.Debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout
}

if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to run ffmpeg: %w", err)
}
return nil
}

if opts.Debug {
log = lgr.New(lgr.Msec, lgr.Debug, lgr.CallerFunc)
log.Logf("[DEBUG] Debug mode enabled")
log.Logf("Options: %+v\n", opts)
destURL := fmt.Sprintf("rtmps://%s/s/%s", opts.TGServer, opts.TGKey)
log.Printf("[INFO] Start retranslation from %s to %s", opts.StreamURL, destURL)
start := time.Now()
if err := spawnFFmpeg(destURL); err != nil {
return fmt.Errorf("failed to start retranslation: %w", err)
}

for {
if opts.Check {
checkStatus()
}
startWork()
log.Printf("[INFO] End retranslation in %v", time.Since(start))
return nil
}

func setupLog(dbg bool) {
logOpts := []lgr.Option{lgr.Msec, lgr.LevelBraces, lgr.StackTraceOnError}
if dbg {
logOpts = []lgr.Option{lgr.Debug, lgr.CallerFile, lgr.CallerFunc, lgr.Msec, lgr.LevelBraces, lgr.StackTraceOnError}
}
lgr.SetupStdLogger(logOpts...)
}
Loading