Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize round by any B/O who has the initializeRound flag set to true #3029

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.MaxGasPrice = flag.Int("maxGasPrice", *cfg.MaxGasPrice, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000")
cfg.EthController = flag.String("ethController", *cfg.EthController, "Protocol smart contract address")
cfg.InitializeRound = flag.Bool("initializeRound", *cfg.InitializeRound, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
cfg.InitializeRoundMaxDelay = flag.Duration("initializeRoundMaxDelay", *cfg.InitializeRoundMaxDelay, "Maximum delay to wait before initializing a round")
cfg.TicketEV = flag.String("ticketEV", *cfg.TicketEV, "The expected value for PM tickets")
cfg.MaxFaceValue = flag.String("maxFaceValue", *cfg.MaxFaceValue, "set max ticket face value in WEI")
// Broadcaster max acceptable ticket EV
Expand Down
205 changes: 104 additions & 101 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,76 +74,77 @@
)

type LivepeerConfig struct {
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
TestOrchAvail *bool
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
TestOrchAvail *bool
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -190,6 +191,7 @@
defaultMaxGasPrice := 0
defaultEthController := ""
defaultInitializeRound := false
defaultInitializeRoundMaxDelay := 30 * time.Second

Check warning on line 194 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L194

Added line #L194 was not covered by tests
defaultTicketEV := "8000000000"
defaultMaxFaceValue := "0"
defaultMaxTicketEV := "3000000000000"
Expand Down Expand Up @@ -264,36 +266,37 @@
TestTranscoder: &defaultTestTranscoder,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
EthPassword: &defaultEthPassword,
EthKeystorePath: &defaultEthKeystorePath,
EthOrchAddr: &defaultEthOrchAddr,
EthUrl: &defaultEthUrl,
TxTimeout: &defaultTxTimeout,
MaxTxReplacements: &defaultMaxTxReplacements,
GasLimit: &defaultGasLimit,
MaxGasPrice: &defaultMaxGasPrice,
EthController: &defaultEthController,
InitializeRound: &defaultInitializeRound,
TicketEV: &defaultTicketEV,
MaxFaceValue: &defaultMaxFaceValue,
MaxTicketEV: &defaultMaxTicketEV,
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Redeemer: &defaultRedeemer,
RedeemerAddr: &defaultRedeemerAddr,
Monitor: &defaultMonitor,
MetricsPerStream: &defaultMetricsPerStream,
MetricsExposeClientIP: &defaultMetricsExposeClientIP,
MetadataQueueUri: &defaultMetadataQueueUri,
MetadataAmqpExchange: &defaultMetadataAmqpExchange,
MetadataPublishTimeout: &defaultMetadataPublishTimeout,
EthAcctAddr: &defaultEthAcctAddr,
EthPassword: &defaultEthPassword,
EthKeystorePath: &defaultEthKeystorePath,
EthOrchAddr: &defaultEthOrchAddr,
EthUrl: &defaultEthUrl,
TxTimeout: &defaultTxTimeout,
MaxTxReplacements: &defaultMaxTxReplacements,
GasLimit: &defaultGasLimit,
MaxGasPrice: &defaultMaxGasPrice,
EthController: &defaultEthController,
InitializeRound: &defaultInitializeRound,
InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay,
TicketEV: &defaultTicketEV,
MaxFaceValue: &defaultMaxFaceValue,
MaxTicketEV: &defaultMaxTicketEV,
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Redeemer: &defaultRedeemer,
RedeemerAddr: &defaultRedeemerAddr,
Monitor: &defaultMonitor,
MetricsPerStream: &defaultMetricsPerStream,
MetricsExposeClientIP: &defaultMetricsExposeClientIP,
MetadataQueueUri: &defaultMetadataQueueUri,
MetadataAmqpExchange: &defaultMetadataAmqpExchange,
MetadataPublishTimeout: &defaultMetadataPublishTimeout,

Check warning on line 299 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L269-L299

Added lines #L269 - L299 were not covered by tests

// Ingest:
HttpIngest: &defaultHttpIngest,
Expand Down Expand Up @@ -975,7 +978,7 @@
if *cfg.InitializeRound {
// Start round initializer
// The node will only initialize rounds if it in the upcoming active set for the round
initializer := eth.NewRoundInitializer(n.Eth, timeWatcher)
initializer := eth.NewRoundInitializer(n.Eth, timeWatcher, *cfg.InitializeRoundMaxDelay)

Check warning on line 981 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L981

Added line #L981 was not covered by tests
go func() {
if err := initializer.Start(); err != nil {
serviceErr <- err
Expand Down
95 changes: 23 additions & 72 deletions eth/roundinitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import (
"math/big"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/golang/glog"
)
Expand All @@ -29,20 +30,22 @@
// This selection process is purely a client side implementation that attempts to minimize on-chain transaction collisions, but
// collisions are still possible if initialization transactions are submitted by parties that are not using this selection process
type RoundInitializer struct {
client LivepeerEthClient
tw timeWatcher
quit chan struct{}
maxDelay time.Duration
client LivepeerEthClient
tw timeWatcher
quit chan struct{}

nextRoundStartL1Block *big.Int
mu sync.Mutex
}

// NewRoundInitializer creates a RoundInitializer instance
func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher) *RoundInitializer {
func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher, maxDelay time.Duration) *RoundInitializer {
return &RoundInitializer{
client: client,
tw: tw,
quit: make(chan struct{}),
maxDelay: maxDelay,
client: client,
tw: tw,
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -104,23 +107,23 @@
r.mu.Lock()
defer r.mu.Unlock()

currentL1Blk := r.tw.LastSeenL1Block()
lastInitializedL1BlkHash := r.tw.LastInitializedL1BlockHash()

epochSeed := r.currentEpochSeed(currentL1Blk, r.nextRoundStartL1Block, lastInitializedL1BlkHash)

ok, err := r.shouldInitialize(epochSeed)
victorges marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
// Round already initialized
return nil

Check warning on line 112 in eth/roundinitializer.go

View check run for this annotation

Codecov / codecov/patch

eth/roundinitializer.go#L112

Added line #L112 was not covered by tests
}

// Noop if the caller should not initialize the round
if !ok {
return nil
if r.maxDelay > 0 {
randDelay := time.Duration(rand.Int63n(int64(r.maxDelay)))
glog.Infof("Waiting %v before attempting to initialize round", randDelay)
time.Sleep(randDelay)

Check warning on line 118 in eth/roundinitializer.go

View check run for this annotation

Codecov / codecov/patch

eth/roundinitializer.go#L116-L118

Added lines #L116 - L118 were not covered by tests
leszko marked this conversation as resolved.
Show resolved Hide resolved

if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
glog.Infof("Round is already initialized, not initializing")
return nil

Check warning on line 122 in eth/roundinitializer.go

View check run for this annotation

Codecov / codecov/patch

eth/roundinitializer.go#L120-L122

Added lines #L120 - L122 were not covered by tests
}
}

currentRound := new(big.Int).Add(r.tw.LastInitializedRound(), big.NewInt(1))

glog.Infof("New round - preparing to initialize round to join active set, current round is %d", currentRound)

tx, err := r.client.InitializeRound()
Expand All @@ -136,55 +139,3 @@

return nil
}

func (r *RoundInitializer) shouldInitialize(epochSeed *big.Int) (bool, error) {
transcoders, err := r.client.TranscoderPool()
if err != nil {
return false, err
}

numActive := big.NewInt(int64(len(transcoders)))

// Should not initialize if the upcoming active set is empty
if numActive.Cmp(big.NewInt(0)) == 0 {
return false, nil
}

// Find the caller's rank in the upcoming active set
rank := int64(-1)
maxRank := numActive.Int64()
caller := r.client.Account().Address
for i := int64(0); i < maxRank; i++ {
if transcoders[i].Address == caller {
rank = i
break
}
}

// Should not initialize if the caller is not in the upcoming active set
if rank == -1 {
return false, nil
}

// Use the seed to select a position within the active set
selection := new(big.Int).Mod(epochSeed, numActive)
// Should not initialize if the selection does not match the caller's rank in the active set
if selection.Int64() != int64(rank) {
return false, nil
}

// If the selection matches the caller's rank the caller should initialize the round
return true, nil
}

// Returns the seed used to select a round initializer in the current epoch for the current round
// This seed is not meant to be unpredictable. The only requirement for the seed is that it is calculated the same way for each
// party running the round initializer
func (r *RoundInitializer) currentEpochSeed(currentL1Block, roundStartL1Block *big.Int, lastInitializedL1BlkHash [32]byte) *big.Int {
epochNum := new(big.Int).Sub(currentL1Block, roundStartL1Block)
epochNum.Div(epochNum, epochL1Blocks)

// The seed for the current epoch is calculated as:
// keccak256(lastInitializedL1BlkHash | epochNum)
return crypto.Keccak256Hash(append(lastInitializedL1BlkHash[:], epochNum.Bytes()...)).Big()
}
Loading
Loading