Skip to content

Commit

Permalink
Adds Streams API support for networking task of PDF.js project.
Browse files Browse the repository at this point in the history
network.js file moved to main thread and `PDFNetworkStream` implemented
at worker thread, that is used to ask for data whenever worker needs.
  • Loading branch information
mukulmishra18 committed Jul 27, 2017
1 parent bd8c121 commit ec6de1a
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 238 deletions.
257 changes: 74 additions & 183 deletions src/core/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,218 +198,120 @@ IPDFStreamRangeReader.prototype = {

/** @implements {IPDFStream} */
var PDFWorkerStream = (function PDFWorkerStreamClosure() {
function PDFWorkerStream(params, msgHandler) {
this._queuedChunks = [];
var initialData = params.initialData;
if (initialData && initialData.length > 0) {
this._queuedChunks.push(initialData);
}
function PDFWorkerStream(msgHandler) {
this._msgHandler = msgHandler;

this._isRangeSupported = !(params.disableRange);
this._isStreamingSupported = !(params.disableStream);
this._contentLength = params.length;

this._contentLength = null;
this._fullRequestReader = null;
this._rangeReaders = [];

msgHandler.on('OnDataRange', this._onReceiveData.bind(this));
msgHandler.on('OnDataProgress', this._onProgress.bind(this));
this._rangeRequestReaders = [];
}
PDFWorkerStream.prototype = {
_onReceiveData: function PDFWorkerStream_onReceiveData(args) {
if (args.begin === undefined) {
if (this._fullRequestReader) {
this._fullRequestReader._enqueue(args.chunk);
} else {
this._queuedChunks.push(args.chunk);
}
} else {
var found = this._rangeReaders.some(function (rangeReader) {
if (rangeReader._begin !== args.begin) {
return false;
}
rangeReader._enqueue(args.chunk);
return true;
});
assert(found);
}
},

_onProgress: function PDFWorkerStream_onProgress(evt) {
if (this._rangeReaders.length > 0) {
// Reporting to first range reader.
var firstReader = this._rangeReaders[0];
if (firstReader.onProgress) {
firstReader.onProgress({ loaded: evt.loaded, });
}
}
},

_removeRangeReader: function PDFWorkerStream_removeRangeReader(reader) {
var i = this._rangeReaders.indexOf(reader);
if (i >= 0) {
this._rangeReaders.splice(i, 1);
}
},

getFullReader: function PDFWorkerStream_getFullReader() {
getFullReader() {
assert(!this._fullRequestReader);
var queuedChunks = this._queuedChunks;
this._queuedChunks = null;
return new PDFWorkerStreamReader(this, queuedChunks);
this._fullRequestReader = new PDFWorkerStreamReader(this._msgHandler);
return this._fullRequestReader;
},

getRangeReader: function PDFWorkerStream_getRangeReader(begin, end) {
var reader = new PDFWorkerStreamRangeReader(this, begin, end);
this._msgHandler.send('RequestDataRange', { begin, end, });
this._rangeReaders.push(reader);
getRangeReader(begin, end) {
let reader = new PDFWorkerStreamRangeReader(begin, end, this._msgHandler);
this._rangeRequestReaders.push(reader);
return reader;
},

cancelAllRequests: function PDFWorkerStream_cancelAllRequests(reason) {
cancelAllRequests(reason) {
if (this._fullRequestReader) {
this._fullRequestReader.cancel(reason);
}
var readers = this._rangeReaders.slice(0);
readers.forEach(function (rangeReader) {
rangeReader.cancel(reason);
let readers = this._rangeRequestReaders.slice(0);
readers.forEach(function (reader) {
reader.cancel(reason);
});
},
};

/** @implements {IPDFStreamReader} */
function PDFWorkerStreamReader(stream, queuedChunks) {
this._stream = stream;
this._done = false;
this._queuedChunks = queuedChunks || [];
this._requests = [];
this._headersReady = Promise.resolve();
stream._fullRequestReader = this;

this.onProgress = null; // not used
function PDFWorkerStreamReader(msgHandler) {
this._msgHandler = msgHandler;

this._contentLength = null;
this._isRangeSupported = false;
this._isStreamingSupported = false;

let readableStream = this._msgHandler.sendWithStream('GetReader');

this._reader = readableStream.getReader();

this._headersReady = this._msgHandler.sendWithPromise('ReaderHeadersReady').
then((data) => {
this._isStreamingSupported = data.isStreamingSupported;
this._isRangeSupported = data.isRangeSupported;
this._contentLength = data.contentLength;
});
}
PDFWorkerStreamReader.prototype = {
_enqueue: function PDFWorkerStreamReader_enqueue(chunk) {
if (this._done) {
return; // ignore new data
}
if (this._requests.length > 0) {
var requestCapability = this._requests.shift();
requestCapability.resolve({ value: chunk, done: false, });
return;
}
this._queuedChunks.push(chunk);
},

get headersReady() {
return this._headersReady;
},

get isRangeSupported() {
return this._stream._isRangeSupported;
get contentLength() {
return this._contentLength;
},

get isStreamingSupported() {
return this._stream._isStreamingSupported;
return this._isStreamingSupported;
},

get contentLength() {
return this._stream._contentLength;
get isRangeSupported() {
return this._isRangeSupported;
},

read: function PDFWorkerStreamReader_read() {
if (this._queuedChunks.length > 0) {
var chunk = this._queuedChunks.shift();
return Promise.resolve({ value: chunk, done: false, });
}
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
var requestCapability = createPromiseCapability();
this._requests.push(requestCapability);
return requestCapability.promise;
read() {
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
// `value` is wrapped into Uint8Array, we need to
// unwrap it to ArrayBuffer for further processing.
return { value: value.buffer, done: false, };
});
},

cancel: function PDFWorkerStreamReader_cancel(reason) {
this._done = true;
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
cancel(reason) {
this._reader.cancel(reason);
},
};

/** @implements {IPDFStreamRangeReader} */
function PDFWorkerStreamRangeReader(stream, begin, end) {
this._stream = stream;
this._begin = begin;
this._end = end;
this._queuedChunk = null;
this._requests = [];
this._done = false;

function PDFWorkerStreamRangeReader(begin, end, msgHandler) {
this._msgHandler = msgHandler;
this.onProgress = null;

let readableStream = this._msgHandler.sendWithStream('GetRangeReader',
{ begin, end, });

this._reader = readableStream.getReader();
}
PDFWorkerStreamRangeReader.prototype = {
_enqueue: function PDFWorkerStreamRangeReader_enqueue(chunk) {
if (this._done) {
return; // ignore new data
}
if (this._requests.length === 0) {
this._queuedChunk = chunk;
} else {
var requestsCapability = this._requests.shift();
requestsCapability.resolve({ value: chunk, done: false, });
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
}
this._done = true;
this._stream._removeRangeReader(this);
},

get isStreamingSupported() {
return false;
},

read: function PDFWorkerStreamRangeReader_read() {
if (this._queuedChunk) {
return Promise.resolve({ value: this._queuedChunk, done: false, });
}
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
var requestCapability = createPromiseCapability();
this._requests.push(requestCapability);
return requestCapability.promise;
read() {
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
return { value: value.buffer, done: false, };
});
},

cancel: function PDFWorkerStreamRangeReader_cancel(reason) {
this._done = true;
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
this._stream._removeRangeReader(this);
cancel(reason) {
this._reader.cancel(reason);
},
};

return PDFWorkerStream;
})();

/** @type IPDFStream */
var PDFNetworkStream;

/**
* Sets PDFNetworkStream class to be used as alternative PDF data transport.
* @param {IPDFStream} cls - the PDF data transport.
*/
function setPDFNetworkStreamClass(cls) {
PDFNetworkStream = cls;
}

var WorkerMessageHandler = {
setup(handler, port) {
var testMessageProcessed = false;
Expand Down Expand Up @@ -536,35 +438,16 @@ var WorkerMessageHandler = {
return pdfManagerCapability.promise;
}

var pdfStream;
var pdfStream, cachedChunks = [];
try {
if (source.chunkedViewerLoading) {
pdfStream = new PDFWorkerStream(source, handler);
} else {
if (!PDFNetworkStream) {
throw new Error('./network module is not loaded');
}
pdfStream = new PDFNetworkStream(data);
}
pdfStream = new PDFWorkerStream(handler);
} catch (ex) {
pdfManagerCapability.reject(ex);
return pdfManagerCapability.promise;
}

var fullRequest = pdfStream.getFullReader();
fullRequest.headersReady.then(function () {
if (!fullRequest.isStreamingSupported ||
!fullRequest.isRangeSupported) {
// If stream or range are disabled, it's our only way to report
// loading progress.
fullRequest.onProgress = function (evt) {
handler.send('DocProgress', {
loaded: evt.loaded,
total: evt.total,
});
};
}

if (!fullRequest.isRangeSupported) {
return;
}
Expand All @@ -580,14 +463,23 @@ var WorkerMessageHandler = {
disableAutoFetch,
rangeChunkSize: source.rangeChunkSize,
}, evaluatorOptions, docBaseUrl);
// There may be a chance that `pdfManager` is not initialized
// for first few runs of `readchunk` block of code. Be sure
// to send all cached chunks, if any, to chunked_stream via
// pdf_manager.
for (let i = 0; i < cachedChunks.length; i++) {
pdfManager.sendProgressiveData(cachedChunks[i]);
}

cachedChunks = [];
pdfManagerCapability.resolve(pdfManager);
cancelXHRs = null;
}).catch(function (reason) {
pdfManagerCapability.reject(reason);
cancelXHRs = null;
});

var cachedChunks = [], loaded = 0;
var loaded = 0;
var flushChunks = function () {
var pdfFile = arraysToBytes(cachedChunks);
if (source.length && pdfFile.length !== source.length) {
Expand Down Expand Up @@ -969,7 +861,6 @@ if (typeof window === 'undefined' && !isNodeJS() &&
}

export {
setPDFNetworkStreamClass,
WorkerTask,
WorkerMessageHandler,
};
Loading

0 comments on commit ec6de1a

Please sign in to comment.