diff --git a/core/playlistmanager.go b/core/playlistmanager.go index f5d974f01c..6357aa5de0 100644 --- a/core/playlistmanager.go +++ b/core/playlistmanager.go @@ -57,7 +57,7 @@ type BasicPlaylistManager struct { mediaLists map[string]*m3u8.MediaPlaylist mapSync *sync.RWMutex jsonList *JsonPlaylist - jsonListWriteQueue *drivers.OvewriteQueue + jsonListWriteQueue *drivers.OverwriteQueue jsonListSync *sync.Mutex } @@ -242,7 +242,8 @@ func (mgr *BasicPlaylistManager) makeNewOverwriteQueue() { if mgr.jsonListWriteQueue != nil { mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout) } - mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name, "json playlist", + mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name, + fmt.Sprintf("json playlist for manifestId=%s", mgr.manifestID), jsonPlaylistMaxRetries, JsonPlaylistInitialTimeout, JsonPlaylistMaxTimeout) } diff --git a/drivers/overwrite_queue.go b/drivers/overwrite_queue.go index 4fff1424bb..3a568be372 100644 --- a/drivers/overwrite_queue.go +++ b/drivers/overwrite_queue.go @@ -8,11 +8,12 @@ import ( ) const ( - timeoutMultiplier = 1.5 + timeoutMultiplier = 1.5 + overwriteQueueLength = 32 ) type ( - OvewriteQueue struct { + OverwriteQueue struct { desc string name string session OSSession @@ -24,15 +25,15 @@ type ( } ) -func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OvewriteQueue { - oq := &OvewriteQueue{ +func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OverwriteQueue { + oq := &OverwriteQueue{ desc: desc, name: name, maxRetries: maxRetries, session: session, initialTimeout: initialTimeout, maxTimeout: maxTimeout, - queue: make(chan []byte, 32), + queue: make(chan []byte, overwriteQueueLength), quit: make(chan struct{}), } if maxRetries < 1 { @@ -43,35 +44,21 @@ func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, ini } // Save queues data to be saved -func (oq *OvewriteQueue) Save(data []byte) { +func (oq *OverwriteQueue) Save(data []byte) { oq.queue <- data } // StopAfter stops reading loop after some time -func (oq *OvewriteQueue) StopAfter(pause time.Duration) { +func (oq *OverwriteQueue) StopAfter(pause time.Duration) { + glog.Infof("=======> stop after %s", pause) + // panic(nil) go func(p time.Duration) { time.Sleep(p) close(oq.quit) }(pause) } -// waitForQueueToClear used in tests -func (oq *OvewriteQueue) waitForQueueToClear(timeout time.Duration) { - // wait for work to be queued - time.Sleep(5 * time.Millisecond) - start := time.Now() - for { - if len(oq.queue) == 0 { - return - } - if time.Since(start) > timeout { - return - } - time.Sleep(20 * time.Millisecond) - } -} - -func (oq *OvewriteQueue) workerLoop() { +func (oq *OverwriteQueue) workerLoop() { var err error var took time.Duration for { @@ -106,7 +93,7 @@ func (oq *OvewriteQueue) workerLoop() { } } -func (oq *OvewriteQueue) getLastMessage(current []byte) []byte { +func (oq *OverwriteQueue) getLastMessage(current []byte) []byte { res := current for { select { diff --git a/drivers/overwrite_queue_test.go b/drivers/overwrite_queue_test.go index dba8ca5439..9dd25c3855 100644 --- a/drivers/overwrite_queue_test.go +++ b/drivers/overwrite_queue_test.go @@ -6,9 +6,25 @@ import ( "time" ) +// waitForQueueToClear used in tests +func (oq *OverwriteQueue) waitForQueueToClear(timeout time.Duration) { + // wait for work to be queued + time.Sleep(5 * time.Millisecond) + start := time.Now() + for { + if len(oq.queue) == 0 { + return + } + if time.Since(start) > timeout { + return + } + time.Sleep(20 * time.Millisecond) + } +} + func TestOverwriteQueueShouldCallSave(t *testing.T) { timeout := 500 * time.Millisecond - mos := &MockOSSession{} + mos := NewMockOSSession() oq := NewOverwriteQueue(mos, "f1", "save", 2, timeout, 15*time.Second) data1 := []byte("data01") @@ -25,7 +41,7 @@ func TestOverwriteQueueShouldCallSave(t *testing.T) { func TestOverwriteQueueShouldRetry(t *testing.T) { timeout := 500 * time.Millisecond - mos := &MockOSSession{} + mos := NewMockOSSession() oq := NewOverwriteQueue(mos, "f1", "retry", 2, timeout, 15*time.Second) data1 := []byte("data01") @@ -44,8 +60,7 @@ func TestOverwriteQueueShouldRetry(t *testing.T) { func TestOverwriteQueueShouldUseLastValue(t *testing.T) { timeout := 300 * time.Millisecond - mos := &MockOSSession{} - mos.init() + mos := NewMockOSSession() oq := NewOverwriteQueue(mos, "f1", "use last", 2, timeout, 15*time.Second) dataw1 := []byte("dataw01") @@ -61,10 +76,11 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) { <-mos.back oq.Save(data2) oq.Save(data3) - mos.waitCh <- nil + mos.waitCh <- struct{}{} oq.waitForQueueToClear(5 * time.Second) mos.AssertExpectations(t) + mos.AssertNotCalled(t, "SaveData", "f1", data2, meta, timeout) oq.StopAfter(0) time.Sleep(10 * time.Millisecond) } diff --git a/drivers/session_mock.go b/drivers/session_mock.go index cae2d290cb..33674f05b5 100644 --- a/drivers/session_mock.go +++ b/drivers/session_mock.go @@ -2,7 +2,6 @@ package drivers import ( "context" - "fmt" "time" "github.com/livepeer/go-livepeer/net" @@ -12,20 +11,21 @@ import ( type MockOSSession struct { mock.Mock waitForCh bool - waitCh chan interface{} - back chan interface{} + waitCh chan struct{} + back chan struct{} } -func (s *MockOSSession) init() { - s.waitCh = make(chan interface{}) - s.back = make(chan interface{}) +func NewMockOSSession() *MockOSSession { + return &MockOSSession{ + waitCh: make(chan struct{}), + back: make(chan struct{}), + } } func (s *MockOSSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) { - fmt.Printf("mock SaveData %s %s %s\n", name, data, timeout) args := s.Called(name, data, meta, timeout) if s.waitForCh { - s.back <- nil + s.back <- struct{}{} <-s.waitCh s.waitForCh = false } diff --git a/server/broadcast_test.go b/server/broadcast_test.go index 027bd28798..aa2240aeec 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -1378,8 +1378,11 @@ func TestVerifier_Invocation(t *testing.T) { func TestVerifier_Verify(t *testing.T) { assert := assert.New(t) - os := drivers.NewMemoryDriver(nil).NewSession("") + oldjpqt := core.JsonPlaylistQuitTimeout + defer func() { + core.JsonPlaylistQuitTimeout = oldjpqt + }() core.JsonPlaylistQuitTimeout = 0 * time.Second c := core.NewBasicPlaylistManager(core.ManifestID("streamName"), os, nil) cxn := &rtmpConnection{ diff --git a/server/push_test.go b/server/push_test.go index 4f7ea95c1c..8431045a72 100644 --- a/server/push_test.go +++ b/server/push_test.go @@ -169,6 +169,10 @@ func TestPush_MultipartReturn(t *testing.T) { osd := drivers.NewMemoryDriver(url) osSession := osd.NewSession("testPath") + oldjpqt := core.JsonPlaylistQuitTimeout + defer func() { + core.JsonPlaylistQuitTimeout = oldjpqt + }() core.JsonPlaylistQuitTimeout = 0 * time.Second pl := core.NewBasicPlaylistManager("xx", osSession, nil) @@ -1019,6 +1023,12 @@ func TestPush_WebhookRequestURL(t *testing.T) { } func TestPush_OSPerStream(t *testing.T) { + oldjpqt := core.JsonPlaylistQuitTimeout + defer func() { + core.JsonPlaylistQuitTimeout = oldjpqt + }() + core.JsonPlaylistQuitTimeout = 0 * time.Second + lpmon.NodeID = "testNode" drivers.Testing = true assert := assert.New(t) @@ -1222,6 +1232,10 @@ func TestPush_ConcurrentSegments(t *testing.T) { } func TestPush_ReuseIntmidWithDiffExtmid(t *testing.T) { + oldjpqt := core.JsonPlaylistQuitTimeout + defer func() { + core.JsonPlaylistQuitTimeout = oldjpqt + }() defer goleak.VerifyNone(t, common.IgnoreRoutines()...) core.JsonPlaylistQuitTimeout = 0 * time.Second