diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 4719d07e04..6b5ee3315e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -11,6 +11,7 @@ - \#2282 Add checksums and gpg signature support with binary releases. (@hjpotter92) - \#2344 Use T.TempDir to create temporary test directory (@Juneezee) - \#2353 Codesign and notarize macOS binaries to be allowed to run without warnings on apple devices (@hjpotter92) +- \#2351 Refactor livepeer.go to enable running Livepeer node from the code (@leszko) #### Broadcaster - \#2309 Add dynamic timeout for the orchestrator discovery (@leszko) diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 295c1ad957..e4a6c77d6b 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -73,103 +73,85 @@ const RtmpPort = "1935" const RpcPort = "8935" const CliPort = "7935" +type LivepeerConfig struct { + network *string + rtmpAddr *string + cliAddr *string + httpAddr *string + serviceAddr *string + orchAddr *string + verifierURL *string + ethController *string + verifierPath *string + localVerify *bool + httpIngest *bool + orchestrator *bool + transcoder *bool + broadcaster *bool + orchSecret *string + transcodingOptions *string + maxAttempts *int + selectRandFreq *float64 + maxSessions *int + currentManifest *bool + nvidia *string + testTranscoder *bool + sceneClassificationModelPath *string + ethAcctAddr *string + ethPassword *string + ethKeystorePath *string + ethOrchAddr *string + ethUrl *string + txTimeout *time.Duration + maxTxReplacements *int + gasLimit *int + minGasPrice *int64 + maxGasPrice *int + initializeRound *bool + ticketEV *string + maxTicketEV *string + depositMultiplier *int + pricePerUnit *int + maxPricePerUnit *int + pixelsPerUnit *int + autoAdjustPrice *bool + blockPollingInterval *int + redeemer *bool + redeemerAddr *string + reward *bool + monitor *bool + metricsPerStream *bool + metricsExposeClientIP *bool + metadataQueueUri *string + metadataAmqpExchange *string + metadataPublishTimeout *time.Duration + datadir *string + objectstore *string + recordstore *string + authWebhookURL *string + orchWebhookURL *string + detectionWebhookURL *string +} + func main() { // Override the default flag set since there are dependencies that // incorrectly add their own flags (specifically, due to the 'testing' // package being linked) flag.Set("logtostderr", "true") - usr, err := user.Current() - if err != nil { - glog.Fatalf("Cannot find current user: %v", err) - } vFlag := flag.Lookup("v") //We preserve this flag before resetting all the flags. Not a scalable approach, but it'll do for now. More discussions here - https://github.com/livepeer/go-livepeer/pull/617 flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) + // Help & Log mistJson := flag.Bool("j", false, "Print application info as json") - - // Network & Addresses: - network := flag.String("network", "offchain", "Network to connect to") - rtmpAddr := flag.String("rtmpAddr", "127.0.0.1:"+RtmpPort, "Address to bind for RTMP commands") - cliAddr := flag.String("cliAddr", "127.0.0.1:"+CliPort, "Address to bind for CLI commands") - httpAddr := flag.String("httpAddr", "", "Address to bind for HTTP commands") - serviceAddr := flag.String("serviceAddr", "", "Orchestrator only. Overrides the on-chain serviceURI that broadcasters can use to contact this node; may be an IP or hostname.") - orchAddr := flag.String("orchAddr", "", "Comma-separated list of orchestrators to connect to") - verifierURL := flag.String("verifierUrl", "", "URL of the verifier to use") - - verifierPath := flag.String("verifierPath", "", "Path to verifier shared volume") - localVerify := flag.Bool("localVerify", true, "Set to true to enable local verification i.e. pixel count and signature verification.") - httpIngest := flag.Bool("httpIngest", true, "Set to true to enable HTTP ingest") - - // Transcoding: - orchestrator := flag.Bool("orchestrator", false, "Set to true to be an orchestrator") - transcoder := flag.Bool("transcoder", false, "Set to true to be a transcoder") - broadcaster := flag.Bool("broadcaster", false, "Set to true to be a broadcaster") - orchSecret := flag.String("orchSecret", "", "Shared secret with the orchestrator as a standalone transcoder") - transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9", "Transcoding options for broadcast job, or path to json config") - maxAttempts := flag.Int("maxAttempts", 3, "Maximum transcode attempts") - selectRandFreq := flag.Float64("selectRandFreq", 0.3, "Frequency to randomly select unknown orchestrators (on-chain mode only)") - maxSessions := flag.Int("maxSessions", 10, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder") - currentManifest := flag.Bool("currentManifest", false, "Expose the currently active ManifestID as \"/stream/current.m3u8\"") - nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)") - testTranscoder := flag.Bool("testTranscoder", true, "Test Nvidia GPU transcoding at startup") - sceneClassificationModelPath := flag.String("sceneClassificationModelPath", "", "Path to scene classification model") - - // Onchain: - ethAcctAddr := flag.String("ethAcctAddr", "", "Existing Eth account address") - ethPassword := flag.String("ethPassword", "", "Password for existing Eth account address") - ethKeystorePath := flag.String("ethKeystorePath", "", "Path for the Eth Key") - ethOrchAddr := flag.String("ethOrchAddr", "", "ETH address of an on-chain registered orchestrator") - ethUrl := flag.String("ethUrl", "", "Ethereum node JSON-RPC URL") - txTimeout := flag.Duration("transactionTimeout", 5*time.Minute, "Amount of time to wait for an Ethereum transaction to confirm before timing out") - maxTxReplacements := flag.Int("maxTransactionReplacements", 1, "Number of times to automatically replace pending Ethereum transactions") - gasLimit := flag.Int("gasLimit", 0, "Gas limit for ETH transactions") - minGasPrice := flag.Int64("minGasPrice", 0, "Minimum gas price (priority fee + base fee) for ETH transactions in wei, 10 Gwei = 10000000000") - maxGasPrice := flag.Int("maxGasPrice", 0, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000") - ethController := flag.String("ethController", "", "Protocol smart contract address") - initializeRound := flag.Bool("initializeRound", false, "Set to true if running as a transcoder and the node should automatically initialize new rounds") - ticketEV := flag.String("ticketEV", "1000000000000", "The expected value for PM tickets") - // Broadcaster max acceptable ticket EV - maxTicketEV := flag.String("maxTicketEV", "3000000000000", "The maximum acceptable expected value for PM tickets") - // Broadcaster deposit multiplier to determine max acceptable ticket faceValue - depositMultiplier := flag.Int("depositMultiplier", 1, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") - // Orchestrator base pricing info - pricePerUnit := flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels") - // Broadcaster max acceptable price - maxPricePerUnit := flag.Int("maxPricePerUnit", 0, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price") - // Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice - pixelsPerUnit := flag.Int("pixelsPerUnit", 1, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel") - autoAdjustPrice := flag.Bool("autoAdjustPrice", true, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets") - // Interval to poll for blocks - blockPollingInterval := flag.Int("blockPollingInterval", 5, "Interval in seconds at which different blockchain event services poll for blocks") - // Redemption service - redeemer := flag.Bool("redeemer", false, "Set to true to run a ticket redemption service") - redeemerAddr := flag.String("redeemerAddr", "", "URL of the ticket redemption service to use") - // Reward service - reward := flag.Bool("reward", false, "Set to true to run a reward service") - // Metrics & logging: - monitor := flag.Bool("monitor", false, "Set to true to send performance metrics") - metricsPerStream := flag.Bool("metricsPerStream", false, "Set to true to group performance metrics per stream") - metricsExposeClientIP := flag.Bool("metricsClientIP", false, "Set to true to expose client's IP in metrics") version := flag.Bool("version", false, "Print out the version") verbosity := flag.String("v", "", "Log verbosity. {4|5|6}") - metadataQueueUri := flag.String("metadataQueueUri", "", "URI for message broker to send operation metadata") - metadataAmqpExchange := flag.String("metadataAmqpExchange", "lp_golivepeer_metadata", "Name of AMQP exchange to send operation metadata") - metadataPublishTimeout := flag.Duration("metadataPublishTimeout", 1*time.Second, "Max time to wait in background for publishing operation metadata events") - - // Storage: - datadir := flag.String("datadir", "", "Directory that data is stored in") - objectstore := flag.String("objectStore", "", "url of primary object store") - recordstore := flag.String("recordStore", "", "url of object store for recordings") - // API - authWebhookURL := flag.String("authWebhookUrl", "", "RTMP authentication webhook URL") - orchWebhookURL := flag.String("orchWebhookUrl", "", "Orchestrator discovery callback URL") - detectionWebhookURL := flag.String("detectionWebhookUrl", "", "(Experimental) Detection results callback URL") + cfg := parseLivepeerConfig() // Config file _ = flag.String("config", "", "Config file in the format 'key value', flags and env vars take precedence over the config file") - err = ff.Parse(flag.CommandLine, os.Args[1:], + err := ff.Parse(flag.CommandLine, os.Args[1:], ff.WithConfigFileFlag("config"), ff.WithEnvVarPrefix("LP"), ff.WithConfigFileParser(ff.PlainParser), @@ -180,10 +162,7 @@ func main() { vFlag.Value.Set(*verbosity) - isFlagSet := make(map[string]bool) - flag.Visit(func(f *flag.Flag) { isFlagSet[f.Name] = true }) - - blockPollingTime := time.Duration(*blockPollingInterval) * time.Second + cfg = updateNilsForUnsetFlags(cfg) if *mistJson { mistconnector.PrintMistConfigJson( @@ -204,19 +183,277 @@ func main() { return } - if *maxSessions <= 0 { + ctx, cancel := context.WithCancel(context.Background()) + lc := make(chan struct{}) + + go func() { + StartLivepeer(ctx, cfg) + lc <- struct{}{} + }() + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + select { + case sig := <-c: + glog.Infof("Exiting Livepeer: %v", sig) + cancel() + time.Sleep(time.Millisecond * 500) //Give time for other processes to shut down completely + case <-lc: + } +} + +// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process. +func DefaultLivepeerConfig() LivepeerConfig { + // Network & Addresses: + defaultNetwork := "offchain" + defaultRtmpAddr := "127.0.0.1:" + RtmpPort + defaultCliAddr := "127.0.0.1:" + CliPort + defaultHttpAddr := "" + defaultServiceAddr := "" + defaultOrchAddr := "" + defaultVerifierURL := "" + defaultVerifierPath := "" + + // Transcoding: + defaultOrchestrator := false + defaultTranscoder := false + defaultBroadcaster := false + defaultOrchSecret := "" + defaultTranscodingOptions := "P240p30fps16x9,P360p30fps16x9" + defaultMaxAttempts := 3 + defaultSelectRandFreq := 0.3 + defaultMaxSessions := 10 + defaultCurrentManifest := false + defaultNvidia := "" + defaultTestTranscoder := true + defaultSceneClassificationModelPath := "" + + // Onchain: + defaultEthAcctAddr := "" + defaultEthPassword := "" + defaultEthKeystorePath := "" + defaultEthOrchAddr := "" + defaultEthUrl := "" + defaultTxTimeout := 5 * time.Minute + defaultMaxTxReplacements := 1 + defaultGasLimit := 0 + defaultMaxGasPrice := 0 + defaultEthController := "" + defaultInitializeRound := false + defaultTicketEV := "1000000000000" + defaultMaxTicketEV := "3000000000000" + defaultDepositMultiplier := 1 + defaultMaxPricePerUnit := 0 + defaultPixelsPerUnit := 1 + defaultAutoAdjustPrice := true + defaultBlockPollingInterval := 5 + defaultRedeemer := false + defaultRedeemerAddr := "" + defaultMonitor := false + defaultMetricsPerStream := false + defaultMetricsExposeClientIP := false + defaultMetadataQueueUri := "" + defaultMetadataAmqpExchange := "lp_golivepeer_metadata" + defaultMetadataPublishTimeout := 1 * time.Second + + // Storage: + defaultDatadir := "" + defaultObjectstore := "" + defaultRecordstore := "" + + // API + defaultAuthWebhookURL := "" + defaultOrchWebhookURL := "" + defaultDetectionWebhookURL := "" + + return LivepeerConfig{ + // Network & Addresses: + network: &defaultNetwork, + rtmpAddr: &defaultRtmpAddr, + cliAddr: &defaultCliAddr, + httpAddr: &defaultHttpAddr, + serviceAddr: &defaultServiceAddr, + orchAddr: &defaultOrchAddr, + verifierURL: &defaultVerifierURL, + verifierPath: &defaultVerifierPath, + + // Transcoding: + orchestrator: &defaultOrchestrator, + transcoder: &defaultTranscoder, + broadcaster: &defaultBroadcaster, + orchSecret: &defaultOrchSecret, + transcodingOptions: &defaultTranscodingOptions, + maxAttempts: &defaultMaxAttempts, + selectRandFreq: &defaultSelectRandFreq, + maxSessions: &defaultMaxSessions, + currentManifest: &defaultCurrentManifest, + nvidia: &defaultNvidia, + testTranscoder: &defaultTestTranscoder, + sceneClassificationModelPath: &defaultSceneClassificationModelPath, + + // Onchain: + ethAcctAddr: &defaultEthAcctAddr, + ethPassword: &defaultEthPassword, + ethKeystorePath: &defaultEthKeystorePath, + ethOrchAddr: &defaultEthOrchAddr, + ethUrl: &defaultEthUrl, + txTimeout: &defaultTxTimeout, + maxTxReplacements: &defaultMaxTxReplacements, + gasLimit: &defaultGasLimit, + maxGasPrice: &defaultMaxGasPrice, + ethController: &defaultEthController, + initializeRound: &defaultInitializeRound, + ticketEV: &defaultTicketEV, + maxTicketEV: &defaultMaxTicketEV, + depositMultiplier: &defaultDepositMultiplier, + maxPricePerUnit: &defaultMaxPricePerUnit, + pixelsPerUnit: &defaultPixelsPerUnit, + autoAdjustPrice: &defaultAutoAdjustPrice, + blockPollingInterval: &defaultBlockPollingInterval, + redeemer: &defaultRedeemer, + redeemerAddr: &defaultRedeemerAddr, + monitor: &defaultMonitor, + metricsPerStream: &defaultMetricsPerStream, + metricsExposeClientIP: &defaultMetricsExposeClientIP, + metadataQueueUri: &defaultMetadataQueueUri, + metadataAmqpExchange: &defaultMetadataAmqpExchange, + metadataPublishTimeout: &defaultMetadataPublishTimeout, + + // Storage: + datadir: &defaultDatadir, + objectstore: &defaultObjectstore, + recordstore: &defaultRecordstore, + + // API + authWebhookURL: &defaultAuthWebhookURL, + orchWebhookURL: &defaultOrchWebhookURL, + detectionWebhookURL: &defaultDetectionWebhookURL, + } +} + +func parseLivepeerConfig() LivepeerConfig { + cfg := DefaultLivepeerConfig() + + // Network & Addresses: + cfg.network = flag.String("network", *cfg.network, "Network to connect to") + cfg.rtmpAddr = flag.String("rtmpAddr", *cfg.rtmpAddr, "Address to bind for RTMP commands") + cfg.cliAddr = flag.String("cliAddr", *cfg.cliAddr, "Address to bind for CLI commands") + cfg.httpAddr = flag.String("httpAddr", *cfg.httpAddr, "Address to bind for HTTP commands") + cfg.serviceAddr = flag.String("serviceAddr", *cfg.serviceAddr, "Orchestrator only. Overrides the on-chain serviceURI that broadcasters can use to contact this node; may be an IP or hostname.") + cfg.orchAddr = flag.String("orchAddr", *cfg.orchAddr, "Comma-separated list of orchestrators to connect to") + cfg.verifierURL = flag.String("verifierUrl", *cfg.verifierURL, "URL of the verifier to use") + cfg.verifierPath = flag.String("verifierPath", *cfg.verifierPath, "Path to verifier shared volume") + cfg.localVerify = flag.Bool("localVerify", true, "Set to true to enable local verification i.e. pixel count and signature verification.") + cfg.httpIngest = flag.Bool("httpIngest", true, "Set to true to enable HTTP ingest") + + // Transcoding: + cfg.orchestrator = flag.Bool("orchestrator", *cfg.orchestrator, "Set to true to be an orchestrator") + cfg.transcoder = flag.Bool("transcoder", *cfg.transcoder, "Set to true to be a transcoder") + cfg.broadcaster = flag.Bool("broadcaster", *cfg.broadcaster, "Set to true to be a broadcaster") + cfg.orchSecret = flag.String("orchSecret", *cfg.orchSecret, "Shared secret with the orchestrator as a standalone transcoder") + cfg.transcodingOptions = flag.String("transcodingOptions", *cfg.transcodingOptions, "Transcoding options for broadcast job, or path to json config") + cfg.maxAttempts = flag.Int("maxAttempts", *cfg.maxAttempts, "Maximum transcode attempts") + cfg.selectRandFreq = flag.Float64("selectRandFreq", *cfg.selectRandFreq, "Frequency to randomly select unknown orchestrators (on-chain mode only)") + cfg.maxSessions = flag.Int("maxSessions", *cfg.maxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder") + cfg.currentManifest = flag.Bool("currentManifest", *cfg.currentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"") + cfg.nvidia = flag.String("nvidia", *cfg.nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)") + cfg.testTranscoder = flag.Bool("testTranscoder", *cfg.testTranscoder, "Test Nvidia GPU transcoding at startup") + cfg.sceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.sceneClassificationModelPath, "Path to scene classification model") + + // Onchain: + cfg.ethAcctAddr = flag.String("ethAcctAddr", *cfg.ethAcctAddr, "Existing Eth account address") + cfg.ethPassword = flag.String("ethPassword", *cfg.ethPassword, "Password for existing Eth account address") + cfg.ethKeystorePath = flag.String("ethKeystorePath", *cfg.ethKeystorePath, "Path for the Eth Key") + cfg.ethOrchAddr = flag.String("ethOrchAddr", *cfg.ethOrchAddr, "ETH address of an on-chain registered orchestrator") + cfg.ethUrl = flag.String("ethUrl", *cfg.ethUrl, "Ethereum node JSON-RPC URL") + cfg.txTimeout = flag.Duration("transactionTimeout", *cfg.txTimeout, "Amount of time to wait for an Ethereum transaction to confirm before timing out") + cfg.maxTxReplacements = flag.Int("maxTransactionReplacements", *cfg.maxTxReplacements, "Number of times to automatically replace pending Ethereum transactions") + cfg.gasLimit = flag.Int("gasLimit", *cfg.gasLimit, "Gas limit for ETH transactions") + cfg.minGasPrice = flag.Int64("minGasPrice", 0, "Minimum gas price (priority fee + base fee) for ETH transactions in wei, 10 Gwei = 10000000000") + cfg.maxGasPrice = flag.Int("maxGasPrice", *cfg.maxGasPrice, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000") + cfg.ethController = flag.String("ethController", *cfg.ethController, "Protocol smart contract address") + cfg.initializeRound = flag.Bool("initializeRound", *cfg.initializeRound, "Set to true if running as a transcoder and the node should automatically initialize new rounds") + cfg.ticketEV = flag.String("ticketEV", *cfg.ticketEV, "The expected value for PM tickets") + // Broadcaster max acceptable ticket EV + cfg.maxTicketEV = flag.String("maxTicketEV", *cfg.maxTicketEV, "The maximum acceptable expected value for PM tickets") + // Broadcaster deposit multiplier to determine max acceptable ticket faceValue + cfg.depositMultiplier = flag.Int("depositMultiplier", *cfg.depositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") + // Orchestrator base pricing info + cfg.pricePerUnit = flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels") + // Broadcaster max acceptable price + cfg.maxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.maxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price") + // Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice + cfg.pixelsPerUnit = flag.Int("pixelsPerUnit", *cfg.pixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel") + cfg.autoAdjustPrice = flag.Bool("autoAdjustPrice", *cfg.autoAdjustPrice, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets") + // Interval to poll for blocks + cfg.blockPollingInterval = flag.Int("blockPollingInterval", *cfg.blockPollingInterval, "Interval in seconds at which different blockchain event services poll for blocks") + // Redemption service + cfg.redeemer = flag.Bool("redeemer", *cfg.redeemer, "Set to true to run a ticket redemption service") + cfg.redeemerAddr = flag.String("redeemerAddr", *cfg.redeemerAddr, "URL of the ticket redemption service to use") + // Reward service + cfg.reward = flag.Bool("reward", false, "Set to true to run a reward service") + // Metrics & logging: + cfg.monitor = flag.Bool("monitor", *cfg.monitor, "Set to true to send performance metrics") + cfg.metricsPerStream = flag.Bool("metricsPerStream", *cfg.metricsPerStream, "Set to true to group performance metrics per stream") + cfg.metricsExposeClientIP = flag.Bool("metricsClientIP", *cfg.metricsExposeClientIP, "Set to true to expose client's IP in metrics") + cfg.metadataQueueUri = flag.String("metadataQueueUri", *cfg.metadataQueueUri, "URI for message broker to send operation metadata") + cfg.metadataAmqpExchange = flag.String("metadataAmqpExchange", *cfg.metadataAmqpExchange, "Name of AMQP exchange to send operation metadata") + cfg.metadataPublishTimeout = flag.Duration("metadataPublishTimeout", *cfg.metadataPublishTimeout, "Max time to wait in background for publishing operation metadata events") + + // Storage: + cfg.datadir = flag.String("datadir", *cfg.datadir, "Directory that data is stored in") + cfg.objectstore = flag.String("objectStore", *cfg.objectstore, "url of primary object store") + cfg.recordstore = flag.String("recordStore", *cfg.recordstore, "url of object store for recordings") + + // API + cfg.authWebhookURL = flag.String("authWebhookUrl", *cfg.authWebhookURL, "RTMP authentication webhook URL") + cfg.orchWebhookURL = flag.String("orchWebhookUrl", *cfg.orchWebhookURL, "Orchestrator discovery callback URL") + cfg.detectionWebhookURL = flag.String("detectionWebhookUrl", *cfg.detectionWebhookURL, "(Experimental) Detection results callback URL") + + return cfg +} + +// updateNilsForUnsetFlags changes some cfg fields to nil if they were not explicitly set with flags. +// For some flags, the behavior is different whether the value is default or not set by the user at all. +func updateNilsForUnsetFlags(cfg LivepeerConfig) LivepeerConfig { + res := cfg + + isFlagSet := make(map[string]bool) + flag.Visit(func(f *flag.Flag) { isFlagSet[f.Name] = true }) + + if !isFlagSet["minGasPrice"] { + res.minGasPrice = nil + } + if !isFlagSet["pricePerUnit"] { + res.pricePerUnit = nil + } + if !isFlagSet["reward"] { + res.reward = nil + } + if !isFlagSet["httpIngest"] { + res.httpIngest = nil + } + if !isFlagSet["localVerify"] { + res.localVerify = nil + } + + return res +} + +func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { + if *cfg.maxSessions <= 0 { glog.Fatal("-maxSessions must be greater than zero") return } + blockPollingTime := time.Duration(*cfg.blockPollingInterval) * time.Second + type NetworkConfig struct { ethController string minGasPrice int64 redeemGas int } - ctx := context.Background() - configOptions := map[string]*NetworkConfig{ "rinkeby": { ethController: "0x9a9827455911a858E55f07911904fACC0D66027E", @@ -238,96 +475,89 @@ func main() { } // If multiple orchAddr specified, ensure other necessary flags present and clean up list - var orchURLs []*url.URL - if len(*orchAddr) > 0 { - for _, addr := range strings.Split(*orchAddr, ",") { - addr = strings.TrimSpace(addr) - addr = defaultAddr(addr, "127.0.0.1", RpcPort) - if !strings.HasPrefix(addr, "http") { - addr = "https://" + addr - } - uri, err := url.ParseRequestURI(addr) - if err != nil { - glog.Error("Could not parse orchestrator URI: ", err) - continue - } - orchURLs = append(orchURLs, uri) - } - } + orchURLs := parseOrchAddrs(*cfg.orchAddr) // Setting config options based on specified network var redeemGas int - if netw, ok := configOptions[*network]; ok { - if *ethController == "" { - *ethController = netw.ethController + minGasPrice := int64(0) + if cfg.minGasPrice != nil { + minGasPrice = *cfg.minGasPrice + } + if netw, ok := configOptions[*cfg.network]; ok { + if *cfg.ethController == "" { + *cfg.ethController = netw.ethController } - if !isFlagSet["minGasPrice"] { - *minGasPrice = netw.minGasPrice + if cfg.minGasPrice == nil { + minGasPrice = netw.minGasPrice } redeemGas = netw.redeemGas - glog.Infof("***Livepeer is running on the %v network: %v***", *network, *ethController) + glog.Infof("***Livepeer is running on the %v network: %v***", *cfg.network, *cfg.ethController) } else { redeemGas = redeemGasL1 - glog.Infof("***Livepeer is running on the %v network***", *network) + glog.Infof("***Livepeer is running on the %v network***", *cfg.network) } - if *datadir == "" { + if *cfg.datadir == "" { homedir := os.Getenv("HOME") if homedir == "" { + usr, err := user.Current() + if err != nil { + glog.Fatalf("Cannot find current user: %v", err) + } homedir = usr.HomeDir } - *datadir = filepath.Join(homedir, ".lpData", *network) + *cfg.datadir = filepath.Join(homedir, ".lpData", *cfg.network) } //Make sure datadir is present - if _, err := os.Stat(*datadir); os.IsNotExist(err) { - glog.Infof("Creating data dir: %v", *datadir) - if err = os.MkdirAll(*datadir, 0755); err != nil { + if _, err := os.Stat(*cfg.datadir); os.IsNotExist(err) { + glog.Infof("Creating data dir: %v", *cfg.datadir) + if err = os.MkdirAll(*cfg.datadir, 0755); err != nil { glog.Errorf("Error creating datadir: %v", err) } } //Set up DB - dbh, err := common.InitDB(*datadir + "/lpdb.sqlite3") + dbh, err := common.InitDB(*cfg.datadir + "/lpdb.sqlite3") if err != nil { glog.Errorf("Error opening DB: %v", err) return } defer dbh.Close() - n, err := core.NewLivepeerNode(nil, *datadir, dbh) + n, err := core.NewLivepeerNode(nil, *cfg.datadir, dbh) if err != nil { glog.Errorf("Error creating livepeer node: %v", err) } - if *orchSecret != "" { - n.OrchSecret, _ = common.GetPass(*orchSecret) + if *cfg.orchSecret != "" { + n.OrchSecret, _ = common.GetPass(*cfg.orchSecret) } var transcoderCaps []core.Capability - if *transcoder { - core.WorkDir = *datadir - if *nvidia != "" { + if *cfg.transcoder { + core.WorkDir = *cfg.datadir + if *cfg.nvidia != "" { // Get a list of device ids - devices, err := common.ParseNvidiaDevices(*nvidia) + devices, err := common.ParseNvidiaDevices(*cfg.nvidia) if err != nil { - glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *nvidia, err) + glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *cfg.nvidia, err) } glog.Infof("Transcoding on these Nvidia GPUs: %v", devices) // Test transcoding with nvidia - if *testTranscoder { + if *cfg.testTranscoder { transcoderCaps, err = core.TestTranscoderCapabilities(devices) if err != nil { glog.Fatal(err) } } // FIXME: Short-term hack to pre-load the detection models on every device - if *sceneClassificationModelPath != "" { + if *cfg.sceneClassificationModelPath != "" { detectorProfile := ffmpeg.DSceneAdultSoccer - detectorProfile.ModelPath = *sceneClassificationModelPath + detectorProfile.ModelPath = *cfg.sceneClassificationModelPath core.DetectorProfile = &detectorProfile for _, d := range devices { tc, err := core.NewNvidiaTranscoderWithDetector(&detectorProfile, d) @@ -342,40 +572,40 @@ func main() { } else { // for local software mode, enable all capabilities transcoderCaps = append(core.DefaultCapabilities(), core.OptionalCapabilities()...) - n.Transcoder = core.NewLocalTranscoder(*datadir) + n.Transcoder = core.NewLocalTranscoder(*cfg.datadir) } } - if *redeemer { + if *cfg.redeemer { n.NodeType = core.RedeemerNode - } else if *orchestrator { + } else if *cfg.orchestrator { n.NodeType = core.OrchestratorNode - if !*transcoder { + if !*cfg.transcoder { n.TranscoderManager = core.NewRemoteTranscoderManager() n.Transcoder = n.TranscoderManager } - } else if *transcoder { + } else if *cfg.transcoder { n.NodeType = core.TranscoderNode - } else if *broadcaster { + } else if *cfg.broadcaster { n.NodeType = core.BroadcasterNode - } else if !*reward && !*initializeRound { + } else if (cfg.reward == nil || !*cfg.reward) && !*cfg.initializeRound { glog.Fatalf("No services enabled; must be at least one of -broadcaster, -transcoder, -orchestrator, -redeemer, -reward or -initializeRound") } - lpmon.NodeID = *ethAcctAddr + lpmon.NodeID = *cfg.ethAcctAddr if lpmon.NodeID != "" { lpmon.NodeID += "-" } hn, _ := os.Hostname() lpmon.NodeID += hn - if *monitor { - if *metricsExposeClientIP { - *metricsPerStream = true + if *cfg.monitor { + if *cfg.metricsExposeClientIP { + *cfg.metricsPerStream = true } lpmon.Enabled = true - lpmon.PerStreamMetrics = *metricsPerStream - lpmon.ExposeClientIP = *metricsExposeClientIP + lpmon.PerStreamMetrics = *cfg.metricsPerStream + lpmon.ExposeClientIP = *cfg.metricsExposeClientIP nodeType := lpmon.Default switch n.NodeType { case core.BroadcasterNode: @@ -393,7 +623,7 @@ func main() { watcherErr := make(chan error) serviceErr := make(chan error) var timeWatcher *watchers.TimeWatcher - if *network == "offchain" { + if *cfg.network == "offchain" { glog.Infof("***Livepeer is in off-chain mode***") if err := checkOrStoreChainID(dbh, big.NewInt(0)); err != nil { @@ -403,10 +633,10 @@ func main() { } else { var keystoreDir string - if _, err := os.Stat(*ethKeystorePath); !os.IsNotExist(err) { - keystoreDir, _ = filepath.Split(*ethKeystorePath) + if _, err := os.Stat(*cfg.ethKeystorePath); !os.IsNotExist(err) { + keystoreDir, _ = filepath.Split(*cfg.ethKeystorePath) } else { - keystoreDir = filepath.Join(*datadir, "keystore") + keystoreDir = filepath.Join(*cfg.datadir, "keystore") } if keystoreDir == "" { @@ -415,12 +645,12 @@ func main() { } //Get the Eth client connection information - if *ethUrl == "" { + if *cfg.ethUrl == "" { glog.Fatal("Need to specify an Ethereum node JSON-RPC URL using -ethUrl") } //Set up eth client - backend, err := ethclient.Dial(*ethUrl) + backend, err := ethclient.Dial(*cfg.ethUrl) if err != nil { glog.Errorf("Failed to connect to Ethereum client: %v", err) return @@ -443,11 +673,11 @@ func main() { } var bigMaxGasPrice *big.Int - if *maxGasPrice > 0 { - bigMaxGasPrice = big.NewInt(int64(*maxGasPrice)) + if *cfg.maxGasPrice > 0 { + bigMaxGasPrice = big.NewInt(int64(*cfg.maxGasPrice)) } - gpm := eth.NewGasPriceMonitor(backend, blockPollingTime, big.NewInt(*minGasPrice), bigMaxGasPrice) + gpm := eth.NewGasPriceMonitor(backend, blockPollingTime, big.NewInt(minGasPrice), bigMaxGasPrice) // Start gas price monitor _, err = gpm.Start(ctx) if err != nil { @@ -456,29 +686,29 @@ func main() { } defer gpm.Stop() - am, err := eth.NewAccountManager(ethcommon.HexToAddress(*ethAcctAddr), keystoreDir, chainID) + am, err := eth.NewAccountManager(ethcommon.HexToAddress(*cfg.ethAcctAddr), keystoreDir, chainID) if err != nil { glog.Errorf("Error creating Ethereum account manager: %v", err) return } - if err := am.Unlock(*ethPassword); err != nil { + if err := am.Unlock(*cfg.ethPassword); err != nil { glog.Errorf("Error unlocking Ethereum account: %v", err) return } - tm := eth.NewTransactionManager(backend, gpm, am, *txTimeout, *maxTxReplacements) + tm := eth.NewTransactionManager(backend, gpm, am, *cfg.txTimeout, *cfg.maxTxReplacements) go tm.Start() defer tm.Stop() ethCfg := eth.LivepeerEthClientConfig{ AccountManager: am, - ControllerAddr: ethcommon.HexToAddress(*ethController), + ControllerAddr: ethcommon.HexToAddress(*cfg.ethController), EthClient: backend, GasPriceMonitor: gpm, TransactionManager: tm, Signer: types.LatestSignerForChainID(chainID), - CheckTxTimeout: time.Duration(int64(*txTimeout) * int64(*maxTxReplacements+1)), + CheckTxTimeout: time.Duration(int64(*cfg.txTimeout) * int64(*cfg.maxTxReplacements+1)), } client, err := eth.NewClient(ethCfg) @@ -487,7 +717,7 @@ func main() { return } - if err := client.SetGasInfo(uint64(*gasLimit)); err != nil { + if err := client.SetGasInfo(uint64(*cfg.gasLimit)); err != nil { glog.Errorf("Failed to set gas info on Livepeer Ethereum Client: %v", err) return } @@ -501,7 +731,7 @@ func main() { addrMap := n.Eth.ContractAddresses() // Initialize block watcher that will emit logs used by event watchers - blockWatcherClient, err := blockwatch.NewRPCClient(*ethUrl, ethRPCTimeout) + blockWatcherClient, err := blockwatch.NewRPCClient(*cfg.ethUrl, ethRPCTimeout) if err != nil { glog.Errorf("Failed to setup blockwatch client: %v", err) return @@ -574,8 +804,8 @@ func main() { // By default the ticket recipient is the node's address // If the address of an on-chain registered orchestrator is provided, then it should be specified as the ticket recipient recipientAddr := n.Eth.Account().Address - if *ethOrchAddr != "" { - recipientAddr = ethcommon.HexToAddress(*ethOrchAddr) + if *cfg.ethOrchAddr != "" { + recipientAddr = ethcommon.HexToAddress(*cfg.ethOrchAddr) } smCfg := &pm.LocalSenderMonitorConfig{ @@ -587,32 +817,32 @@ func main() { RPCTimeout: ethRPCTimeout, } - if *orchestrator { + if *cfg.orchestrator { // Set price per pixel base info - if *pixelsPerUnit <= 0 { + if *cfg.pixelsPerUnit <= 0 { // Can't divide by 0 - panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %d", *pixelsPerUnit)) + panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %d", *cfg.pixelsPerUnit)) } - if !isFlagSet["pricePerUnit"] && *pricePerUnit == 0 { + if cfg.pricePerUnit == nil { // Prevent orchestrators from unknowingly providing free transcoding panic(fmt.Errorf("-pricePerUnit must be set")) } - if *pricePerUnit < 0 { - panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %d", *pricePerUnit)) + if *cfg.pricePerUnit < 0 { + panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %d", *cfg.pricePerUnit)) } - n.SetBasePrice(big.NewRat(int64(*pricePerUnit), int64(*pixelsPerUnit))) - glog.Infof("Price: %d wei for %d pixels\n ", *pricePerUnit, *pixelsPerUnit) + n.SetBasePrice(big.NewRat(int64(*cfg.pricePerUnit), int64(*cfg.pixelsPerUnit))) + glog.Infof("Price: %d wei for %d pixels\n ", *cfg.pricePerUnit, *cfg.pixelsPerUnit) - n.AutoAdjustPrice = *autoAdjustPrice + n.AutoAdjustPrice = *cfg.autoAdjustPrice - ev, _ := new(big.Int).SetString(*ticketEV, 10) + ev, _ := new(big.Int).SetString(*cfg.ticketEV, 10) if ev == nil { - glog.Errorf("-ticketEV must be a valid integer, but %v provided. Restart the node with a different valid value for -ticketEV", *ticketEV) + glog.Errorf("-ticketEV must be a valid integer, but %v provided. Restart the node with a different valid value for -ticketEV", *cfg.ticketEV) return } if ev.Cmp(big.NewInt(0)) < 0 { - glog.Errorf("-ticketEV must be greater than 0, but %v provided. Restart the node with a different valid value for -ticketEV", *ticketEV) + glog.Errorf("-ticketEV must be greater than 0, but %v provided. Restart the node with a different valid value for -ticketEV", *cfg.ticketEV) return } @@ -625,9 +855,9 @@ func main() { validator := pm.NewValidator(sigVerifier, timeWatcher) var sm pm.SenderMonitor - if *redeemerAddr != "" { - *redeemerAddr = defaultAddr(*redeemerAddr, "127.0.0.1", RpcPort) - rc, err := server.NewRedeemerClient(*redeemerAddr, senderWatcher, timeWatcher) + if *cfg.redeemerAddr != "" { + *cfg.redeemerAddr = defaultAddr(*cfg.redeemerAddr, "127.0.0.1", RpcPort) + rc, err := server.NewRedeemerClient(*cfg.redeemerAddr, senderWatcher, timeWatcher) if err != nil { glog.Error("Unable to start redeemer client: ", err) return @@ -662,17 +892,17 @@ func main() { } if n.NodeType == core.BroadcasterNode { - ev, _ := new(big.Rat).SetString(*maxTicketEV) + ev, _ := new(big.Rat).SetString(*cfg.maxTicketEV) if ev == nil { - panic(fmt.Errorf("-maxTicketEV must be a valid rational number, but %v provided. Restart the node with a valid value for -maxTicketEV", *maxTicketEV)) + panic(fmt.Errorf("-maxTicketEV must be a valid rational number, but %v provided. Restart the node with a valid value for -maxTicketEV", *cfg.maxTicketEV)) } if ev.Cmp(big.NewRat(0, 1)) < 0 { - panic(fmt.Errorf("-maxTicketEV must not be negative, but %v provided. Restart the node with a valid value for -maxTicketEV", *maxTicketEV)) + panic(fmt.Errorf("-maxTicketEV must not be negative, but %v provided. Restart the node with a valid value for -maxTicketEV", *cfg.maxTicketEV)) } - if *depositMultiplier <= 0 { - panic(fmt.Errorf("-depositMultiplier must be greater than 0, but %v provided. Restart the node with a valid value for -depositMultiplier", *depositMultiplier)) + if *cfg.depositMultiplier <= 0 { + panic(fmt.Errorf("-depositMultiplier must be greater than 0, but %v provided. Restart the node with a valid value for -depositMultiplier", *cfg.depositMultiplier)) } // Fetch and cache broadcaster on-chain info @@ -684,16 +914,16 @@ func main() { glog.Info("Broadcaster Deposit: ", eth.FormatUnits(info.Deposit, "ETH")) glog.Info("Broadcaster Reserve: ", eth.FormatUnits(info.Reserve.FundsRemaining, "ETH")) - n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, ev, *depositMultiplier) + n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, ev, *cfg.depositMultiplier) - if *pixelsPerUnit <= 0 { + if *cfg.pixelsPerUnit <= 0 { // Can't divide by 0 - panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit)) + panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *cfg.pixelsPerUnit)) } - if *maxPricePerUnit > 0 { - server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*maxPricePerUnit), int64(*pixelsPerUnit))) + if *cfg.maxPricePerUnit > 0 { + server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*cfg.maxPricePerUnit), int64(*cfg.pixelsPerUnit))) } else { - glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *maxPricePerUnit) + glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.maxPricePerUnit) glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values") } } @@ -714,8 +944,8 @@ func main() { return } - *httpAddr = defaultAddr(*httpAddr, "127.0.0.1", RpcPort) - url, err := url.ParseRequestURI("https://" + *httpAddr) + *cfg.httpAddr = defaultAddr(*cfg.httpAddr, "127.0.0.1", RpcPort) + url, err := url.ParseRequestURI("https://" + *cfg.httpAddr) if err != nil { glog.Error("Could not parse redeemer URI: ", err) return @@ -728,10 +958,11 @@ func main() { } }() defer r.Stop() - glog.Infof("Redeemer started on %v", *httpAddr) + glog.Infof("Redeemer started on %v", *cfg.httpAddr) } - if !isFlagSet["reward"] { + var reward bool + if cfg.reward == nil { // If the node address is an on-chain registered address, start the reward service t, err := n.Eth.GetTranscoder(n.Eth.Account().Address) if err != nil { @@ -739,11 +970,13 @@ func main() { return } if t.Status == "Registered" { - *reward = true + reward = true + } else { + reward = false } } - if *reward { + if reward { // Start reward service // The node will only call reward if it is active in the current round rs := eth.NewRewardService(n.Eth, timeWatcher) @@ -756,7 +989,7 @@ func main() { defer rs.Stop() } - if *initializeRound { + if *cfg.initializeRound { // Start round initializer // The node will only initialize rounds if it in the upcoming active set for the round initializer := eth.NewRoundInitializer(n.Eth, timeWatcher) @@ -797,8 +1030,8 @@ func main() { }() } - if *objectstore != "" { - prepared, err := drivers.PrepareOSURL(*objectstore) + if *cfg.objectstore != "" { + prepared, err := drivers.PrepareOSURL(*cfg.objectstore) if err != nil { glog.Error("Error creating object store driver: ", err) return @@ -810,8 +1043,8 @@ func main() { } } - if *recordstore != "" { - prepared, err := drivers.PrepareOSURL(*recordstore) + if *cfg.recordstore != "" { + prepared, err := drivers.PrepareOSURL(*cfg.recordstore) if err != nil { glog.Error("Error creating recordings object store driver: ", err) return @@ -823,13 +1056,13 @@ func main() { } } - core.MaxSessions = *maxSessions + core.MaxSessions = *cfg.maxSessions if lpmon.Enabled { lpmon.MaxSessions(core.MaxSessions) } - if *authWebhookURL != "" { - parsedUrl, err := validateURL(*authWebhookURL) + if *cfg.authWebhookURL != "" { + parsedUrl, err := validateURL(*cfg.authWebhookURL) if err != nil { glog.Fatal("Error setting auth webhook URL ", err) } @@ -837,27 +1070,28 @@ func main() { server.AuthWebhookURL = parsedUrl } - if *detectionWebhookURL != "" { - parsedUrl, err := validateURL(*detectionWebhookURL) + if *cfg.detectionWebhookURL != "" { + parsedUrl, err := validateURL(*cfg.detectionWebhookURL) if err != nil { glog.Fatal("Error setting detection webhook URL ", err) } glog.Info("Using detection webhook URL ", parsedUrl.Redacted()) server.DetectionWebhookURL = parsedUrl } + httpIngest := true if n.NodeType == core.BroadcasterNode { // default lpms listener for broadcaster; same as default rpc port // TODO provide an option to disable this? - *rtmpAddr = defaultAddr(*rtmpAddr, "127.0.0.1", RtmpPort) - *httpAddr = defaultAddr(*httpAddr, "127.0.0.1", RpcPort) + *cfg.rtmpAddr = defaultAddr(*cfg.rtmpAddr, "127.0.0.1", RtmpPort) + *cfg.httpAddr = defaultAddr(*cfg.httpAddr, "127.0.0.1", RpcPort) bcast := core.NewBroadcaster(n) // When the node is on-chain mode always cache the on-chain orchestrators and poll for updates // Right now we rely on the DBOrchestratorPoolCache constructor to do this. Consider separating the logic // caching/polling from the logic for fetching orchestrators during discovery - if *network != "offchain" { + if *cfg.network != "offchain" { ctx, cancel := context.WithCancel(ctx) defer cancel() dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher) @@ -869,8 +1103,8 @@ func main() { } // Set up orchestrator discovery - if *orchWebhookURL != "" { - whurl, err := validateURL(*orchWebhookURL) + if *cfg.orchWebhookURL != "" { + whurl, err := validateURL(*cfg.orchWebhookURL) if err != nil { glog.Fatal("Error setting orch webhook URL ", err) } @@ -885,82 +1119,89 @@ func main() { glog.Error("No orchestrator specified; transcoding will not happen") } - isLocalHTTP, err := isLocalURL("https://" + *httpAddr) + isLocalHTTP, err := isLocalURL("https://" + *cfg.httpAddr) if err != nil { glog.Errorf("Error checking for local -httpAddr: %v", err) return } - if !isFlagSet["httpIngest"] && !isLocalHTTP && server.AuthWebhookURL == nil { + if cfg.httpIngest != nil { + httpIngest = *cfg.httpIngest + } + if cfg.httpIngest == nil && !isLocalHTTP && server.AuthWebhookURL == nil { glog.Warning("HTTP ingest is disabled because -httpAddr is publicly accessible. To enable, configure -authWebhookUrl or use the -httpIngest flag") - *httpIngest = false + httpIngest = false } // Disable local verification when running in off-chain mode // To enable, set -localVerify or -verifierURL - if !isFlagSet["localVerify"] && *network == "offchain" { - *localVerify = false + localVerify := true + if cfg.localVerify != nil { + localVerify = *cfg.localVerify + } + if cfg.localVerify == nil && *cfg.network == "offchain" { + localVerify = false } - if *verifierURL != "" { - _, err := validateURL(*verifierURL) + if *cfg.verifierURL != "" { + _, err := validateURL(*cfg.verifierURL) if err != nil { glog.Fatal("Error setting verifier URL ", err) } - glog.Info("Using the Epic Labs classifier for verification at ", *verifierURL) - server.Policy = &verification.Policy{Retries: 2, Verifier: &verification.EpicClassifier{Addr: *verifierURL}} + glog.Info("Using the Epic Labs classifier for verification at ", *cfg.verifierURL) + server.Policy = &verification.Policy{Retries: 2, Verifier: &verification.EpicClassifier{Addr: *cfg.verifierURL}} // Set the verifier path. Remove once [1] is implemented! // [1] https://github.com/livepeer/verification-classifier/issues/64 - if drivers.NodeStorage == nil && *verifierPath == "" { + if drivers.NodeStorage == nil && *cfg.verifierPath == "" { glog.Fatal("Requires a path to the verifier shared volume when local storage is in use; use -verifierPath or -objectStore") } - verification.VerifierPath = *verifierPath - } else if *localVerify { + verification.VerifierPath = *cfg.verifierPath + } else if localVerify { glog.Info("Local verification enabled") server.Policy = &verification.Policy{Retries: 2} } // Set max transcode attempts. <=0 is OK; it just means "don't transcode" - server.MaxAttempts = *maxAttempts - server.SelectRandFreq = *selectRandFreq + server.MaxAttempts = *cfg.maxAttempts + server.SelectRandFreq = *cfg.selectRandFreq } else if n.NodeType == core.OrchestratorNode { - suri, err := getServiceURI(n, *serviceAddr) + suri, err := getServiceURI(n, *cfg.serviceAddr) if err != nil { glog.Fatal("Error getting service URI: ", err) } n.SetServiceURI(suri) // if http addr is not provided, listen to all ifaces // take the port to listen to from the service URI - *httpAddr = defaultAddr(*httpAddr, "", n.GetServiceURI().Port()) + *cfg.httpAddr = defaultAddr(*cfg.httpAddr, "", n.GetServiceURI().Port()) - if *sceneClassificationModelPath != "" { + if *cfg.sceneClassificationModelPath != "" { // Only enable experimental capabilities if scene classification model is actually loaded transcoderCaps = append(transcoderCaps, core.ExperimentalCapabilities()...) } - if !*transcoder && n.OrchSecret == "" { + if !*cfg.transcoder && n.OrchSecret == "" { glog.Fatal("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode") } } n.Capabilities = core.NewCapabilities(transcoderCaps, core.MandatoryOCapabilities()) - *cliAddr = defaultAddr(*cliAddr, "127.0.0.1", CliPort) + *cfg.cliAddr = defaultAddr(*cfg.cliAddr, "127.0.0.1", CliPort) if drivers.NodeStorage == nil { // base URI will be empty for broadcasters; that's OK drivers.NodeStorage = drivers.NewMemoryDriver(n.GetServiceURI()) } - if *metadataPublishTimeout > 0 { - server.MetadataPublishTimeout = *metadataPublishTimeout + if *cfg.metadataPublishTimeout > 0 { + server.MetadataPublishTimeout = *cfg.metadataPublishTimeout } - if *metadataQueueUri != "" { - uri, err := url.ParseRequestURI(*metadataQueueUri) + if *cfg.metadataQueueUri != "" { + uri, err := url.ParseRequestURI(*cfg.metadataQueueUri) if err != nil { glog.Fatalf("Error parsing -metadataQueueUri: err=%q", err) } switch uri.Scheme { case "amqp", "amqps": - uriStr, exchange, keyNs := *metadataQueueUri, *metadataAmqpExchange, n.NodeType.String() + uriStr, exchange, keyNs := *cfg.metadataQueueUri, *cfg.metadataAmqpExchange, n.NodeType.String() server.MetadataQueue, err = event.NewAMQPExchangeProducer(context.Background(), uriStr, exchange, keyNs) if err != nil { glog.Fatalf("Error establishing AMQP connection: err=%q", err) @@ -973,7 +1214,7 @@ func main() { //Create Livepeer Node //Set up the media server - s, err := server.NewLivepeerServer(*rtmpAddr, n, *httpIngest, *transcodingOptions) + s, err := server.NewLivepeerServer(*cfg.rtmpAddr, n, httpIngest, *cfg.transcodingOptions) if err != nil { glog.Fatal("Error creating Livepeer server err=", err) } @@ -984,18 +1225,18 @@ func main() { msCtx, cancel := context.WithCancel(ctx) defer cancel() - if *currentManifest { - glog.Info("Current ManifestID will be available over ", *httpAddr) - s.ExposeCurrentManifest = *currentManifest + if *cfg.currentManifest { + glog.Info("Current ManifestID will be available over ", *cfg.httpAddr) + s.ExposeCurrentManifest = *cfg.currentManifest } go func() { - s.StartCliWebserver(*cliAddr) + s.StartCliWebserver(*cfg.cliAddr) close(wc) }() if n.NodeType != core.RedeemerNode { go func() { - ec <- s.StartMediaServer(msCtx, *httpAddr) + ec <- s.StartMediaServer(msCtx, *cfg.httpAddr) }() } @@ -1007,7 +1248,7 @@ func main() { orch := core.NewOrchestrator(s.LivepeerNode, timeWatcher) go func() { - server.StartTranscodeServer(orch, *httpAddr, s.HTTPMux, n.WorkDir, n.TranscoderManager != nil) + server.StartTranscodeServer(orch, *cfg.httpAddr, s.HTTPMux, n.WorkDir, n.TranscoderManager != nil) tc <- struct{}{} }() @@ -1030,7 +1271,7 @@ func main() { glog.Fatal("Missing -orchAddr") } - go server.RunTranscoder(n, orchURLs[0].Host, *maxSessions, transcoderCaps) + go server.RunTranscoder(n, orchURLs[0].Host, *cfg.maxSessions, transcoderCaps) } switch n.NodeType { @@ -1038,15 +1279,13 @@ func main() { glog.Infof("***Livepeer Running in Orchestrator Mode***") case core.BroadcasterNode: glog.Infof("***Livepeer Running in Broadcaster Mode***") - glog.Infof("Video Ingest Endpoint - rtmp://%v", *rtmpAddr) + glog.Infof("Video Ingest Endpoint - rtmp://%v", *cfg.rtmpAddr) case core.TranscoderNode: glog.Infof("**Liveepeer Running in Transcoder Mode***") case core.RedeemerNode: glog.Infof("**Livepeer Running in Redeemer Mode**") } - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt) select { case err := <-watcherErr: glog.Error(err) @@ -1058,21 +1297,39 @@ func main() { if err != nil { glog.Fatalf("Error starting service: %v", err) } - case <-msCtx.Done(): - glog.Infof("MediaServer Done()") - return case <-tc: glog.Infof("Orchestrator server shut down") case <-wc: glog.Infof("CLI webserver shut down") return - case sig := <-c: - glog.Infof("Exiting Livepeer: %v", sig) - time.Sleep(time.Millisecond * 500) //Give time for other processes to shut down completely + case <-msCtx.Done(): + glog.Infof("MediaServer Done()") + return + case <-ctx.Done(): return } } +func parseOrchAddrs(addrs string) []*url.URL { + var res []*url.URL + if len(addrs) > 0 { + for _, addr := range strings.Split(addrs, ",") { + addr = strings.TrimSpace(addr) + addr = defaultAddr(addr, "127.0.0.1", RpcPort) + if !strings.HasPrefix(addr, "http") { + addr = "https://" + addr + } + uri, err := url.ParseRequestURI(addr) + if err != nil { + glog.Error("Could not parse orchestrator URI: ", err) + continue + } + res = append(res, uri) + } + } + return res +} + func validateURL(u string) (*url.URL, error) { if u == "" { return nil, nil