Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rl/e2e configure orchestrator #2443

Merged
merged 13 commits into from
Jun 23, 2022
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ jobs:
files: ./cover.out
name: go-livepeer
verbose: true
gcov_ignore: ./eth/stubclient.go
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### General
- \#1333 Display git-sha in startup logging (@emranemran)
- \#2443 Add e2e tests for O configuration and resignation (@red-0ne)

#### Broadcaster
- \#2462 cmd: Delete temporary env variable LP_IS_ORCH_TESTER (@leszko)
Expand Down
22 changes: 12 additions & 10 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@ import (
"context"
"errors"
"fmt"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/core"
"io/ioutil"
"math/big"
"net"
"net/http"
"net/url"
"os"
"os/user"
"path/filepath"
"strings"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/build"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/discovery"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/eth"
Expand All @@ -28,13 +35,6 @@ import (
"github.com/livepeer/go-livepeer/verification"
"github.com/livepeer/livepeer-data/pkg/event"
"github.com/livepeer/lpms/ffmpeg"
"io/ioutil"
"math/big"
"net"
"net/http"
"net/url"
"path/filepath"
"strings"
)

var (
Expand Down Expand Up @@ -1082,8 +1082,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
s.ExposeCurrentManifest = *cfg.CurrentManifest
}

srv := &http.Server{Addr: *cfg.CliAddr}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
go func() {
s.StartCliWebserver(*cfg.CliAddr)
s.StartCliWebserver(srv)
close(wc)
}()
if n.NodeType != core.RedeemerNode {
Expand Down Expand Up @@ -1160,6 +1161,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Infof("MediaServer Done()")
return
case <-ctx.Done():
srv.Shutdown(ctx)
return
}
}
Expand Down
8 changes: 3 additions & 5 deletions server/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
ethtypes "github.com/ethereum/go-ethereum/core/types"
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth/types"
"github.com/livepeer/lpms/ffmpeg"
"io"
"io/ioutil"
"math/big"
Expand All @@ -19,8 +15,11 @@ import (

"github.com/ethereum/go-ethereum/accounts"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/types"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/lpms/ffmpeg"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -992,7 +991,6 @@ func TestVoteHandler(t *testing.T) {
handler = voteHandler(client)
status, body = postForm(handler, form)
assert.Equal(http.StatusOK, status)
assert.Equal((ethtypes.NewTx(&ethtypes.DynamicFeeTx{})).Hash().Bytes(), []byte(body))
}

// Tickets
Expand Down
8 changes: 6 additions & 2 deletions server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) {
// port++
// this one really starts server (without a way to shut it down)
cliUrl := fmt.Sprintf("127.0.0.1:%d", port)
go S.StartCliWebserver(cliUrl)
srv := &http.Server{Addr: cliUrl}
go S.StartCliWebserver(srv)
port++
// sometimes LivepeerServer needs time to start
// esp if this is the only test in the suite being run (eg, via `-run)
Expand Down Expand Up @@ -134,7 +135,10 @@ func setupServerWithCancelAndPorts() (*LivepeerServer, context.CancelFunc) {
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
S, _ = NewLivepeerServer("127.0.0.1:2938", n, true, "")
go S.StartMediaServer(ctx, "127.0.0.1:9080")
go S.StartCliWebserver("127.0.0.1:9938")
go func() {
srv := &http.Server{Addr: "127.0.0.1:9938"}
S.StartCliWebserver(srv)
}()
}
return S, cancel
}
Expand Down
11 changes: 3 additions & 8 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@ var vFlag *glog.Level = flag.Lookup("v").Value.(*glog.Level)

// StartCliWebserver starts web server for CLI
// blocks until exit
func (s *LivepeerServer) StartCliWebserver(bindAddr string) {
mux := s.cliWebServerHandlers(bindAddr)
srv := &http.Server{
Addr: bindAddr,
Handler: mux,
}

glog.Info("CLI server listening on ", bindAddr)
func (s *LivepeerServer) StartCliWebserver(srv *http.Server) {
srv.Handler = s.cliWebServerHandlers(srv.Addr)
glog.Info("CLI server listening on ", srv.Addr)
err := srv.ListenAndServe()
glog.Error(err)
}
Expand Down
96 changes: 96 additions & 0 deletions test/e2e/configure_orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package e2e

import (
"context"
"fmt"
"math/big"
"net/url"
"testing"
"time"

"github.com/livepeer/go-livepeer/eth"
"github.com/stretchr/testify/require"
)

func TestConfigureOrchestrator(t *testing.T) {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// given
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

geth := setupGeth(t, ctx)
defer terminateGeth(t, ctx, geth)

oCtx, oCancel := context.WithCancel(context.Background())
o := startOrchestratorWithNewAccount(t, oCtx, geth)
registerOrchestrator(t, o)

// orchestrator needs to restart in order to be able to process rewards, so configuration is unlocked
o = restartOrchestrator(t, ctx, oCancel, geth, o)
defer o.stop()

waitUntilOrchestratorIsConfigurable(t, o.dev.Client)

// when
configureOrchestrator(o, newCfg)

// then
assertOrchestratorConfigured(t, o, newCfg)
}

func restartOrchestrator(t *testing.T, ctx context.Context, cancel context.CancelFunc, geth *gethContainer, o *livepeer) *livepeer {
o.stop()
cancel()
return startOrchestratorWithExistingAccount(t, ctx, geth, o.cfg.EthAcctAddr, o.cfg.Datadir)
}

func waitUntilOrchestratorIsConfigurable(t *testing.T, lpEth eth.LivepeerEthClient) {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
require := require.New(t)

for {
active, err := lpEth.IsActiveTranscoder()
require.NoError(err)

initialized, err := lpEth.CurrentRoundInitialized()
require.NoError(err)

t, err := lpEth.GetTranscoder(lpEth.Account().Address)
require.NoError(err)
rewardCalled := t.LastRewardRound.Cmp(big.NewInt(0)) > 0

if active && initialized && rewardCalled {
return
}

time.Sleep(2 * time.Second)
}
}

func configureOrchestrator(o *livepeer, cfg *orchestratorConfig) {
val := url.Values{
"blockRewardCut": {fmt.Sprintf("%v", cfg.BlockRewardCut)},
"feeShare": {fmt.Sprintf("%v", cfg.FeeShare)},
"serviceURI": {fmt.Sprintf("http://%v", cfg.ServiceURI)},
}

for {
if _, ok := httpPostWithParams(fmt.Sprintf("http://%s/setOrchestratorConfig", *o.cfg.CliAddr), val); ok {
return
}
time.Sleep(200 * time.Millisecond)
}
}

func assertOrchestratorConfigured(t *testing.T, o *livepeer, cfg *orchestratorConfig) {
require := require.New(t)

transPool, err := o.dev.Client.TranscoderPool()
uri, errURI := o.dev.Client.GetServiceURI(o.dev.Client.Account().Address)

require.NoError(err)
require.NoError(errURI)
require.Len(transPool, 1)
trans := transPool[0]
require.Equal(eth.FromPerc(cfg.FeeShare), trans.FeeShare)
require.Equal(eth.FromPerc(cfg.BlockRewardCut), trans.RewardCut)
require.Equal(fmt.Sprintf("http://%v", cfg.ServiceURI), uri)
}
Loading