From 6104b57a76626f7b847b807ee07c543f90acbcad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Velad=20Galv=C3=A1n?= Date: Thu, 15 Jun 2023 22:33:14 +0200 Subject: [PATCH] feat: Add support to streamDataCallback when using prefetch (#5310) This change improve the latency in LL-DASH streams when using prefetch --- lib/media/segment_prefetch.js | 91 +++++++++++++++++++++++++++++++---- lib/media/streaming_engine.js | 7 ++- 2 files changed, 86 insertions(+), 12 deletions(-) diff --git a/lib/media/segment_prefetch.js b/lib/media/segment_prefetch.js index 240237f1a3..dcf47d7166 100644 --- a/lib/media/segment_prefetch.js +++ b/lib/media/segment_prefetch.js @@ -38,7 +38,7 @@ shaka.media.SegmentPrefetch = class { /** * @private {!Map.} + * !shaka.media.SegmentPrefetchOperation>} */ this.segmentPrefetchMap_ = new Map(); } @@ -68,8 +68,10 @@ shaka.media.SegmentPrefetch = class { while (this.segmentPrefetchMap_.size < this.prefetchLimit_ && reference != null) { if (!this.segmentPrefetchMap_.has(reference)) { - const op = this.fetchDispatcher_(reference, this.stream_); - this.segmentPrefetchMap_.set(reference, op); + const segmentPrefetchOperation = + new shaka.media.SegmentPrefetchOperation(this.fetchDispatcher_); + segmentPrefetchOperation.dispatchFetch(reference, this.stream_); + this.segmentPrefetchMap_.set(reference, segmentPrefetchOperation); } this.prefetchPosTime_ = reference.startTime; reference = iterator.next().value; @@ -79,10 +81,11 @@ shaka.media.SegmentPrefetch = class { /** * Get the result of prefetched segment if already exists. * @param {(!shaka.media.SegmentReference)} reference + * @param {?function(BufferSource):!Promise=} streamDataCallback * @return {?shaka.net.NetworkingEngine.PendingRequest} op * @public */ - getPrefetchedSegment(reference) { + getPrefetchedSegment(reference, streamDataCallback) { goog.asserts.assert(this.prefetchLimit_ > 0, 'SegmentPrefetch can not be used when prefetchLimit <= 0.'); goog.asserts.assert(reference instanceof shaka.media.SegmentReference, @@ -91,13 +94,16 @@ shaka.media.SegmentPrefetch = class { const logPrefix = shaka.media.SegmentPrefetch.logPrefix_(this.stream_); if (this.segmentPrefetchMap_.has(reference)) { - const op = this.segmentPrefetchMap_.get(reference); + const segmentPrefetchOperation = this.segmentPrefetchMap_.get(reference); + if (streamDataCallback) { + segmentPrefetchOperation.setStreamDataCallback(streamDataCallback); + } this.segmentPrefetchMap_.delete(reference); shaka.log.debug( logPrefix, 'reused prefetched segment at time:', reference.startTime, 'mapSize', this.segmentPrefetchMap_.size); - return op; + return segmentPrefetchOperation.getOperation(); } else { shaka.log.debug( logPrefix, @@ -163,10 +169,10 @@ shaka.media.SegmentPrefetch = class { */ abortPrefetchedSegment_(reference) { const logPrefix = shaka.media.SegmentPrefetch.logPrefix_(this.stream_); - const operation = this.segmentPrefetchMap_.get(reference); + const segmentPrefetchOperation = this.segmentPrefetchMap_.get(reference); this.segmentPrefetchMap_.delete(reference); - if (operation) { - operation.abort(); + if (segmentPrefetchOperation) { + segmentPrefetchOperation.abort(); shaka.log.info( logPrefix, 'pop and abort prefetched segment at time:', reference.startTime); @@ -183,10 +189,75 @@ shaka.media.SegmentPrefetch = class { } }; +/** + * @summary + * This class manages a segment prefetch operation. + */ +shaka.media.SegmentPrefetchOperation = class { + /** + * @param {shaka.media.SegmentPrefetch.FetchDispatcher} fetchDispatcher + */ + constructor(fetchDispatcher) { + /** @private {shaka.media.SegmentPrefetch.FetchDispatcher} */ + this.fetchDispatcher_ = fetchDispatcher; + + /** @private {?function(BufferSource):!Promise} */ + this.streamDataCallback_ = null; + + /** @private {?shaka.net.NetworkingEngine.PendingRequest} */ + this.operation_ = null; + } + + /** + * Fetch a segments + * + * @param {!shaka.media.SegmentReference} + * reference + * @param {!shaka.extern.Stream} stream + * @public + */ + dispatchFetch(reference, stream) { + this.operation_ = this.fetchDispatcher_( + reference, stream, async (data) => { + if (this.streamDataCallback_) { + await this.streamDataCallback_(data); + } + }); + } + + /** + * Get the operation of prefetched segment if already exists. + * + * @return {?shaka.net.NetworkingEngine.PendingRequest} op + * @public + */ + getOperation() { + return this.operation_; + } + + /** + * @param {?function(BufferSource):!Promise} streamDataCallback + * @public + */ + setStreamDataCallback(streamDataCallback) { + this.streamDataCallback_ = streamDataCallback; + } + + /** + * Abort the current operation if exists. + */ + abort() { + if (this.operation_) { + this.operation_.abort(); + } + } +}; + /** * @typedef {function( * !(shaka.media.InitSegmentReference|shaka.media.SegmentReference), - * shaka.extern.Stream + * shaka.extern.Stream, + * ?function(BufferSource):!Promise= * ):!shaka.net.NetworkingEngine.PendingRequest} * * @description diff --git a/lib/media/streaming_engine.js b/lib/media/streaming_engine.js index 6268366fb4..d654166c90 100644 --- a/lib/media/streaming_engine.js +++ b/lib/media/streaming_engine.js @@ -901,7 +901,9 @@ shaka.media.StreamingEngine = class { return new shaka.media.SegmentPrefetch( this.config_.segmentPrefetchLimit, stream, - (reference, stream) => this.dispatchFetch_(reference, stream, null), + (reference, stream, streamDataCallback) => { + return this.dispatchFetch_(reference, stream, streamDataCallback); + }, ); } return null; @@ -2162,7 +2164,8 @@ shaka.media.StreamingEngine = class { mediaState.segmentPrefetch && reference instanceof shaka.media.SegmentReference ) { - op = mediaState.segmentPrefetch.getPrefetchedSegment(reference); + op = mediaState.segmentPrefetch.getPrefetchedSegment( + reference, streamDataCallback); } if (!op) { op = this.dispatchFetch_(