Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
darkdarkdragon committed Jul 20, 2021
1 parent 9dd88db commit f0c59f4
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 41 deletions.
5 changes: 3 additions & 2 deletions core/playlistmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type BasicPlaylistManager struct {
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListWriteQueue *drivers.OvewriteQueue
jsonListWriteQueue *drivers.OverwriteQueue
jsonListSync *sync.Mutex
}

Expand Down Expand Up @@ -242,7 +242,8 @@ func (mgr *BasicPlaylistManager) makeNewOverwriteQueue() {
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name, "json playlist",
mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name,
fmt.Sprintf("json playlist for manifestId=%s", mgr.manifestID),
jsonPlaylistMaxRetries, JsonPlaylistInitialTimeout, JsonPlaylistMaxTimeout)
}

Expand Down
37 changes: 12 additions & 25 deletions drivers/overwrite_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
)

const (
timeoutMultiplier = 1.5
timeoutMultiplier = 1.5
overwriteQueueLength = 32
)

type (
OvewriteQueue struct {
OverwriteQueue struct {
desc string
name string
session OSSession
Expand All @@ -24,15 +25,15 @@ type (
}
)

func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OvewriteQueue {
oq := &OvewriteQueue{
func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OverwriteQueue {
oq := &OverwriteQueue{
desc: desc,
name: name,
maxRetries: maxRetries,
session: session,
initialTimeout: initialTimeout,
maxTimeout: maxTimeout,
queue: make(chan []byte, 32),
queue: make(chan []byte, overwriteQueueLength),
quit: make(chan struct{}),
}
if maxRetries < 1 {
Expand All @@ -43,35 +44,21 @@ func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, ini
}

// Save queues data to be saved
func (oq *OvewriteQueue) Save(data []byte) {
func (oq *OverwriteQueue) Save(data []byte) {
oq.queue <- data
}

// StopAfter stops reading loop after some time
func (oq *OvewriteQueue) StopAfter(pause time.Duration) {
func (oq *OverwriteQueue) StopAfter(pause time.Duration) {
glog.Infof("=======> stop after %s", pause)
// panic(nil)
go func(p time.Duration) {
time.Sleep(p)
close(oq.quit)
}(pause)
}

// waitForQueueToClear used in tests
func (oq *OvewriteQueue) waitForQueueToClear(timeout time.Duration) {
// wait for work to be queued
time.Sleep(5 * time.Millisecond)
start := time.Now()
for {
if len(oq.queue) == 0 {
return
}
if time.Since(start) > timeout {
return
}
time.Sleep(20 * time.Millisecond)
}
}

func (oq *OvewriteQueue) workerLoop() {
func (oq *OverwriteQueue) workerLoop() {
var err error
var took time.Duration
for {
Expand Down Expand Up @@ -106,7 +93,7 @@ func (oq *OvewriteQueue) workerLoop() {
}
}

func (oq *OvewriteQueue) getLastMessage(current []byte) []byte {
func (oq *OverwriteQueue) getLastMessage(current []byte) []byte {
res := current
for {
select {
Expand Down
26 changes: 21 additions & 5 deletions drivers/overwrite_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@ import (
"time"
)

// waitForQueueToClear used in tests
func (oq *OverwriteQueue) waitForQueueToClear(timeout time.Duration) {
// wait for work to be queued
time.Sleep(5 * time.Millisecond)
start := time.Now()
for {
if len(oq.queue) == 0 {
return
}
if time.Since(start) > timeout {
return
}
time.Sleep(20 * time.Millisecond)
}
}

func TestOverwriteQueueShouldCallSave(t *testing.T) {
timeout := 500 * time.Millisecond
mos := &MockOSSession{}
mos := NewMockOSSession()
oq := NewOverwriteQueue(mos, "f1", "save", 2, timeout, 15*time.Second)

data1 := []byte("data01")
Expand All @@ -25,7 +41,7 @@ func TestOverwriteQueueShouldCallSave(t *testing.T) {

func TestOverwriteQueueShouldRetry(t *testing.T) {
timeout := 500 * time.Millisecond
mos := &MockOSSession{}
mos := NewMockOSSession()
oq := NewOverwriteQueue(mos, "f1", "retry", 2, timeout, 15*time.Second)

data1 := []byte("data01")
Expand All @@ -44,8 +60,7 @@ func TestOverwriteQueueShouldRetry(t *testing.T) {

func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
timeout := 300 * time.Millisecond
mos := &MockOSSession{}
mos.init()
mos := NewMockOSSession()
oq := NewOverwriteQueue(mos, "f1", "use last", 2, timeout, 15*time.Second)

dataw1 := []byte("dataw01")
Expand All @@ -61,10 +76,11 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
<-mos.back
oq.Save(data2)
oq.Save(data3)
mos.waitCh <- nil
mos.waitCh <- struct{}{}

oq.waitForQueueToClear(5 * time.Second)
mos.AssertExpectations(t)
mos.AssertNotCalled(t, "SaveData", "f1", data2, meta, timeout)
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}
16 changes: 8 additions & 8 deletions drivers/session_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package drivers

import (
"context"
"fmt"
"time"

"github.com/livepeer/go-livepeer/net"
Expand All @@ -12,20 +11,21 @@ import (
type MockOSSession struct {
mock.Mock
waitForCh bool
waitCh chan interface{}
back chan interface{}
waitCh chan struct{}
back chan struct{}
}

func (s *MockOSSession) init() {
s.waitCh = make(chan interface{})
s.back = make(chan interface{})
func NewMockOSSession() *MockOSSession {
return &MockOSSession{
waitCh: make(chan struct{}),
back: make(chan struct{}),
}
}

func (s *MockOSSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
fmt.Printf("mock SaveData %s %s %s\n", name, data, timeout)
args := s.Called(name, data, meta, timeout)
if s.waitForCh {
s.back <- nil
s.back <- struct{}{}
<-s.waitCh
s.waitForCh = false
}
Expand Down
5 changes: 4 additions & 1 deletion server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,8 +1378,11 @@ func TestVerifier_Invocation(t *testing.T) {

func TestVerifier_Verify(t *testing.T) {
assert := assert.New(t)

os := drivers.NewMemoryDriver(nil).NewSession("")
oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
core.JsonPlaylistQuitTimeout = oldjpqt
}()
core.JsonPlaylistQuitTimeout = 0 * time.Second
c := core.NewBasicPlaylistManager(core.ManifestID("streamName"), os, nil)
cxn := &rtmpConnection{
Expand Down
14 changes: 14 additions & 0 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func TestPush_MultipartReturn(t *testing.T) {
osd := drivers.NewMemoryDriver(url)
osSession := osd.NewSession("testPath")

oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
core.JsonPlaylistQuitTimeout = oldjpqt
}()
core.JsonPlaylistQuitTimeout = 0 * time.Second
pl := core.NewBasicPlaylistManager("xx", osSession, nil)

Expand Down Expand Up @@ -1019,6 +1023,12 @@ func TestPush_WebhookRequestURL(t *testing.T) {
}

func TestPush_OSPerStream(t *testing.T) {
oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
core.JsonPlaylistQuitTimeout = oldjpqt
}()
core.JsonPlaylistQuitTimeout = 0 * time.Second

lpmon.NodeID = "testNode"
drivers.Testing = true
assert := assert.New(t)
Expand Down Expand Up @@ -1222,6 +1232,10 @@ func TestPush_ConcurrentSegments(t *testing.T) {
}

func TestPush_ReuseIntmidWithDiffExtmid(t *testing.T) {
oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
core.JsonPlaylistQuitTimeout = oldjpqt
}()
defer goleak.VerifyNone(t, common.IgnoreRoutines()...)
core.JsonPlaylistQuitTimeout = 0 * time.Second

Expand Down

0 comments on commit f0c59f4

Please sign in to comment.