Skip to content

Commit

Permalink
Allow for out of order nonce #2710
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberj0g authored Jan 13, 2023
2 parents a5606ca + c6237ae commit 11af0a6
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 27 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,6 @@ github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cO
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.4.11 h1:Sv+ss8e4vcscnMWLxcRJ2g3sNIHyQ3RzCtgEelfGPzw=
github.com/livepeer/livepeer-data v0.4.11/go.mod h1:VIbJRdyH2Tas8EgLVkP79IPMepFDOv0dgHYLEZsCaf4=
github.com/livepeer/lpms v0.0.0-20221123192553-7cef5fc8c1d2 h1:q9DU5UATq+3zzpG8MmuGkLQKAalqo/ivXpbbimzuk4U=
github.com/livepeer/lpms v0.0.0-20221123192553-7cef5fc8c1d2/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/lpms v0.0.0-20230106125835-78ae44836422 h1:AIm6737jFdSrUbzUFneCZuxy8OBXllez6k6vRzp93JE=
github.com/livepeer/lpms v0.0.0-20230106125835-78ae44836422/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
Expand Down
33 changes: 22 additions & 11 deletions pm/recipient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ var errInsufficientSenderReserve = errors.New("insufficient sender reserve")
// maxWinProb = 2^256 - 1
var maxWinProb = new(big.Int).Sub(new(big.Int).Lsh(big.NewInt(1), 256), big.NewInt(1))

// max number of sender nonces for a given recipient random hash
var maxSenderNonces = 50

var paramsExpirationBlock = big.NewInt(10)
var paramsExpiryBuffer = int64(1)

Expand Down Expand Up @@ -86,7 +89,7 @@ type recipient struct {
maxfacevalue *big.Int

senderNonces map[string]*struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}
senderNoncesLock sync.Mutex
Expand Down Expand Up @@ -124,7 +127,7 @@ func NewRecipientWithSecret(addr ethcommon.Address, broker Broker, val Validator
secret: secret,
maxfacevalue: big.NewInt(0),
senderNonces: make(map[string]*struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}),
cfg: cfg,
Expand Down Expand Up @@ -363,16 +366,24 @@ func (r *recipient) updateSenderNonce(rand *big.Int, ticket *Ticket) error {
defer r.senderNoncesLock.Unlock()

randStr := rand.String()
sn, ok := r.senderNonces[randStr]
if ok && ticket.SenderNonce <= sn.nonce {
return errors.Errorf("invalid ticket senderNonce sender=%v nonce=%v highest=%v", ticket.Sender.Hex(), ticket.SenderNonce, sn.nonce)
senderNonces, randKeySeen := r.senderNonces[randStr]
if randKeySeen {
_, isSeen := senderNonces.nonceSeen[ticket.SenderNonce]
if isSeen {
return errors.Errorf("invalid ticket senderNonce: already seen sender=%v nonce=%v", ticket.Sender.Hex(), ticket.SenderNonce)
}
} else {
r.senderNonces[randStr] = &struct {
nonceSeen map[uint32]bool
expirationBlock *big.Int
}{make(map[uint32]bool), ticket.ParamsExpirationBlock}
}

r.senderNonces[randStr] = &struct {
nonce uint32
expirationBlock *big.Int
}{ticket.SenderNonce, ticket.ParamsExpirationBlock}

// check nonce map size
if len(r.senderNonces[randStr].nonceSeen) >= maxSenderNonces {
return errors.Errorf("invalid ticket senderNonce: too many values sender=%v nonce=%v", ticket.Sender.Hex(), ticket.SenderNonce)
}
// add new nonce
r.senderNonces[randStr].nonceSeen[ticket.SenderNonce] = true
return nil
}

Expand Down
45 changes: 31 additions & 14 deletions pm/recipient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func TestReceiveTicket_ValidNonWinningTicket(t *testing.T) {
recipientRand := genRecipientRand(sender, secret, params)
senderNonce := r.(*recipient).senderNonces[recipientRand.String()]

if senderNonce.nonce != newSenderNonce {
t.Errorf("expected senderNonce to be %d, got %d", newSenderNonce, senderNonce.nonce)
if _, ok := senderNonce.nonceSeen[newSenderNonce]; !ok {
t.Errorf("expected senderNonce to exist: %d", newSenderNonce)
}
}

Expand Down Expand Up @@ -257,8 +257,8 @@ func TestReceiveTicket_ValidWinningTicket(t *testing.T) {
recipientRand := genRecipientRand(sender, secret, params)
senderNonce := r.(*recipient).senderNonces[recipientRand.String()]

if senderNonce.nonce != newSenderNonce {
t.Errorf("expected senderNonce to be %d, got %d", newSenderNonce, senderNonce.nonce)
if _, ok := senderNonce.nonceSeen[newSenderNonce]; !ok {
t.Errorf("expected senderNonce to exist: %d", newSenderNonce)
}
}

Expand Down Expand Up @@ -308,6 +308,7 @@ func TestReceiveTicket_InvalidSenderNonce(t *testing.T) {
}

func TestReceiveTicket_ValidNonWinningTicket_Concurrent(t *testing.T) {
assert := assert.New(t)
sender, b, v, gm, sm, tm, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, gm, sm, tm, cfg)
params, err := r.TicketParams(sender, big.NewRat(1, 1))
Expand All @@ -330,11 +331,25 @@ func TestReceiveTicket_ValidNonWinningTicket_Concurrent(t *testing.T) {
}
}(uint32(i))
}

wg.Wait()
assert.Zero(errCount)
}

if errCount == 0 {
t.Error("expected more than zero senderNonce errors for concurrent ticket receipt")
func TestReceiveTicket_NonceMapFill(t *testing.T) {
assert := assert.New(t)
sender, b, v, gm, sm, tm, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, gm, sm, tm, cfg)
params, err := r.TicketParams(sender, big.NewRat(1, 1))
require.Nil(t, err)
// fill nonce map to capacity
for i := 0; i < maxSenderNonces+1; i++ {
ticket := newTicket(sender, params, uint32(i))
_, _, err := r.ReceiveTicket(ticket, sig, params.Seed)
if i < maxSenderNonces {
assert.NoError(err)
} else {
assert.Error(err)
}
}
}

Expand Down Expand Up @@ -656,7 +671,7 @@ func TestSenderNoncesCleanupLoop(t *testing.T) {
tm: tm,
quit: make(chan struct{}),
senderNonces: make(map[string]*struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}),
}
Expand All @@ -666,17 +681,19 @@ func TestSenderNoncesCleanupLoop(t *testing.T) {
rand1 := "charizard"
rand2 := "raichu"
r.senderNonces[rand0] = &struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}{1, big.NewInt(3)}
}{map[uint32]bool{1: true}, big.NewInt(3)}

r.senderNonces[rand1] = &struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}{1, big.NewInt(2)}
}{map[uint32]bool{1: true}, big.NewInt(2)}

r.senderNonces[rand2] = &struct {
nonce uint32
nonceSeen map[uint32]bool
expirationBlock *big.Int
}{1, big.NewInt(1)}
}{map[uint32]bool{1: true}, big.NewInt(1)}

go r.senderNoncesCleanupLoop()
time.Sleep(20 * time.Millisecond)
Expand Down
16 changes: 16 additions & 0 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,22 @@ func pushSegmentsBroadcaster(t *testing.T, b *livepeer, numSegs int) {
}
}

func pushSegmentsParallelBroadcaster(t *testing.T, b *livepeer, numSegs int) {
assert := assert.New(t)

var wg sync.WaitGroup
mid := common.RandName()
for i := 0; i < numSegs; i++ {
wg.Add(1)
go func(mid string, seqNo int) {
assert.Nil(pushSegmentBroadcaster(b, mid, i))
wg.Done()
}(mid, i)
}

wg.Wait()
}

func pushSegmentBroadcaster(b *livepeer, manifestID string, seqNo int) error {
data, err := ioutil.ReadFile("test.flv")
rdr := bytes.NewReader(data)
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/http_push_broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ func TestHTTPPushBroadcaster(t *testing.T) {

// Sequential requests
pushSegmentsBroadcaster(t, b, 3)

// Parallel requests
pushSegmentsParallelBroadcaster(t, b, 5)
}

0 comments on commit 11af0a6

Please sign in to comment.