Skip to content

Commit

Permalink
Adds support to accept transfers in sink.enqueue method.
Browse files Browse the repository at this point in the history
  • Loading branch information
mukulmishra18 committed Jul 7, 2017
1 parent 4ad80e1 commit a13a4f8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
25 changes: 18 additions & 7 deletions src/core/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ IPDFStreamRangeReader.prototype = {
/** @implements {IPDFStream} */
function PDFNetworkStream(data, msgHandler) {
this.data = data;
this.msgHandler = msgHandler;
let source = data.source;
this._contentLength = source.length;
this.msgHandler = msgHandler;
this._fullRequestReader = null;
this._rangeRequestReaders = [];
}
Expand Down Expand Up @@ -241,12 +241,12 @@ function PDFNetworkStreamReader(networkStream) {
this._isStreamingSupported = false;

let readableStream = this._msgHandler.sendWithStream('GetReader',
this._data);
this._data);

this._reader = readableStream.getReader();

this._headersReady = this._msgHandler.sendWithPromise('ReaderHeadersReady').
then((data) => {
then((data) => {
this._isStreamingSupported = data.isStreamingSupported;
this._isRangeSupported = data.isRangeSupported;
this._contentLength = data.contentLength;
Expand All @@ -271,7 +271,14 @@ PDFNetworkStreamReader.prototype = {
},

read() {
return this._reader.read();
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
// `value` is wrapped into Uint8Array, we need to
// unwrap it to ArrayBuffer for further processing.
return { value: value.buffer, done: false, };
});
},

cancel(reason) {
Expand All @@ -282,11 +289,10 @@ PDFNetworkStreamReader.prototype = {
/** @implements {IPDFStreamRangeReader} */
function PDFNetworkStreamRangeReader(begin, end, networkStream) {
this._msgHandler = networkStream.msgHandler;
this._data = networkStream.data;
this.onProgress = null;

let readableStream = this._msgHandler.sendWithStream('GetRangeReader',
{ data: this._data, begin, end, });
{ begin, end, });

this._reader = readableStream.getReader();
}
Expand All @@ -297,7 +303,12 @@ PDFNetworkStreamRangeReader.prototype = {
},

read() {
return this._reader.read();
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
return { value: value.buffer, done: false, };
});
},

cancel(reason) {
Expand Down
29 changes: 12 additions & 17 deletions src/display/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1232,9 +1232,8 @@ var PDFWorker = (function PDFWorkerClosure() {
// pdf.worker.js file is needed.
if (typeof PDFJSDev === 'undefined' || !PDFJSDev.test('PRODUCTION')) {
if (typeof SystemJS === 'object') {
Promise.all([SystemJS.import('pdfjs/core/network'),
SystemJS.import('pdfjs/core/worker')]).then((modules) => {
var worker = modules[1];
Promise.resolve(SystemJS.import('pdfjs/core/worker')).
then((worker) => {
WorkerMessageHandler = worker.WorkerMessageHandler;
fakeWorkerFilesLoadedCapability.resolve(WorkerMessageHandler);
});
Expand All @@ -1248,7 +1247,6 @@ var PDFWorker = (function PDFWorkerClosure() {
}
} else if (PDFJSDev.test('SINGLE_FILE')) {
var pdfjsCoreWorker = require('../core/worker.js');
require('../core/network.js');
WorkerMessageHandler = pdfjsCoreWorker.WorkerMessageHandler;
fakeWorkerFilesLoadedCapability.resolve(WorkerMessageHandler);
} else {
Expand Down Expand Up @@ -1611,15 +1609,15 @@ var WorkerTransport = (function WorkerTransportClosure() {
this._fullReader = this._PDFNetworkStream.getFullReader();

sink.onPull = () => {
this._fullReader.read().then(function(result) {
if (result.done) {
this._fullReader.read().then(function({ value, done, }) {
if (done) {
sink.close();
return;
}
sink.enqueue(result.value);
}).catch(function(reason) {
sink.error(reason);
});
// Enqueue data chunk into sink, and transfer it
// to other side as `Transferable` object.
sink.enqueue(new Uint8Array(value), 1, [value]);
}).catch(sink.error);
};

sink.onCancel = (reason) => {
Expand All @@ -1641,20 +1639,17 @@ var WorkerTransport = (function WorkerTransportClosure() {
}, this);

messageHandler.on('GetRangeReader', function(data, sink) {
this._PDFNetworkStream = new PDFNetworkStream(data.data);
this._rangeReader =
this._PDFNetworkStream.getRangeReader(data.begin, data.end);

sink.onPull = () => {
this._rangeReader.read().then(function(result) {
if (result.done) {
this._rangeReader.read().then(function({ value, done, }) {
if (done) {
sink.close();
return;
}
sink.enqueue(result.value);
}).catch(function(reason) {
sink.error(reason);
});
sink.enqueue(new Uint8Array(value), 1, [value]);
}).catch(sink.error);
};

sink.onCancel = (reason) => {
Expand Down
11 changes: 6 additions & 5 deletions src/shared/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -1433,13 +1433,14 @@ MessageHandler.prototype = {
let targetName = data.sourceName;
let capability = createPromiseCapability();

let sendStreamRequest = ({ stream, chunk, success, reason, }) => {
this.comObj.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, });
let sendStreamRequest = ({ stream, chunk, transfers,
success, reason, }) => {
this.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, }, transfers);
};

let streamSink = {
enqueue(chunk, size = 1) {
enqueue(chunk, size = 1, transfers) {
let lastDesiredSize = this.desiredSize;
this.desiredSize -= size;
// Enqueue decreases the desiredSize property of sink,
Expand All @@ -1449,7 +1450,7 @@ MessageHandler.prototype = {
this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise;
}
sendStreamRequest({ stream: 'enqueue', chunk, });
sendStreamRequest({ stream: 'enqueue', chunk, transfers, });
},

close() {
Expand Down

0 comments on commit a13a4f8

Please sign in to comment.