From df6fe1865b25308ba88ce85f756ee47c91483543 Mon Sep 17 00:00:00 2001 From: lorenzo Date: Fri, 7 Apr 2017 15:37:31 +0300 Subject: [PATCH 1/2] added cfs tempstore and worker to fix concurrency issues --- packages/tempstore/.npm/package/.gitignore | 1 + packages/tempstore/.npm/package/README | 7 + .../.npm/package/npm-shrinkwrap.json | 14 + packages/tempstore/.travis.yml | 5 + packages/tempstore/.versions | 29 ++ packages/tempstore/CHANGELOG.md | 169 ++++++++ packages/tempstore/LICENSE.md | 20 + packages/tempstore/README.md | 24 ++ packages/tempstore/api.md | 112 +++++ packages/tempstore/internal.api.md | 225 ++++++++++ packages/tempstore/package.js | 34 ++ packages/tempstore/tempStore.js | 399 ++++++++++++++++++ packages/tempstore/tests/server-tests.js | 39 ++ packages/worker/.travis.yml | 5 + packages/worker/.versions | 32 ++ packages/worker/CHANGELOG.md | 123 ++++++ packages/worker/LICENSE.md | 20 + packages/worker/README.md | 8 + packages/worker/api.md | 38 ++ packages/worker/fileWorker.js | 186 ++++++++ packages/worker/internal.api.md | 143 +++++++ packages/worker/package.js | 36 ++ packages/worker/tests/client-tests.js | 44 ++ packages/worker/tests/server-tests.js | 49 +++ 24 files changed, 1762 insertions(+) create mode 100644 packages/tempstore/.npm/package/.gitignore create mode 100644 packages/tempstore/.npm/package/README create mode 100644 packages/tempstore/.npm/package/npm-shrinkwrap.json create mode 100644 packages/tempstore/.travis.yml create mode 100644 packages/tempstore/.versions create mode 100644 packages/tempstore/CHANGELOG.md create mode 100644 packages/tempstore/LICENSE.md create mode 100644 packages/tempstore/README.md create mode 100644 packages/tempstore/api.md create mode 100644 packages/tempstore/internal.api.md create mode 100644 packages/tempstore/package.js create mode 100644 packages/tempstore/tempStore.js create mode 100644 packages/tempstore/tests/server-tests.js create mode 100644 packages/worker/.travis.yml create mode 100644 packages/worker/.versions create mode 100644 packages/worker/CHANGELOG.md create mode 100644 packages/worker/LICENSE.md create mode 100644 packages/worker/README.md create mode 100644 packages/worker/api.md create mode 100644 packages/worker/fileWorker.js create mode 100644 packages/worker/internal.api.md create mode 100644 packages/worker/package.js create mode 100644 packages/worker/tests/client-tests.js create mode 100644 packages/worker/tests/server-tests.js diff --git a/packages/tempstore/.npm/package/.gitignore b/packages/tempstore/.npm/package/.gitignore new file mode 100644 index 00000000000..3c3629e647f --- /dev/null +++ b/packages/tempstore/.npm/package/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/packages/tempstore/.npm/package/README b/packages/tempstore/.npm/package/README new file mode 100644 index 00000000000..3d492553a43 --- /dev/null +++ b/packages/tempstore/.npm/package/README @@ -0,0 +1,7 @@ +This directory and the files immediately inside it are automatically generated +when you change this package's NPM dependencies. Commit the files in this +directory (npm-shrinkwrap.json, .gitignore, and this README) to source control +so that others run the same versions of sub-dependencies. + +You should NOT check in the node_modules directory that Meteor automatically +creates; if you are using git, the .gitignore file tells git to ignore it. diff --git a/packages/tempstore/.npm/package/npm-shrinkwrap.json b/packages/tempstore/.npm/package/npm-shrinkwrap.json new file mode 100644 index 00000000000..80fbce75b66 --- /dev/null +++ b/packages/tempstore/.npm/package/npm-shrinkwrap.json @@ -0,0 +1,14 @@ +{ + "dependencies": { + "combined-stream": { + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-0.0.4.tgz", + "from": "combined-stream@0.0.4" + }, + "delayed-stream": { + "version": "0.0.5", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-0.0.5.tgz", + "from": "delayed-stream@0.0.5" + } + } +} diff --git a/packages/tempstore/.travis.yml b/packages/tempstore/.travis.yml new file mode 100644 index 00000000000..6a464003387 --- /dev/null +++ b/packages/tempstore/.travis.yml @@ -0,0 +1,5 @@ +language: node_js +node_js: + - "0.10" +before_install: + - "curl -L http://git.io/s0Zu-w | /bin/sh" \ No newline at end of file diff --git a/packages/tempstore/.versions b/packages/tempstore/.versions new file mode 100644 index 00000000000..8fdf6def2dd --- /dev/null +++ b/packages/tempstore/.versions @@ -0,0 +1,29 @@ +base64@1.0.3 +binary-heap@1.0.3 +callback-hook@1.0.3 +cfs:base-package@0.0.30 +cfs:data-man@0.0.6 +cfs:file@0.1.16 +cfs:storage-adapter@0.2.1 +cfs:tempstore@0.1.5 +check@1.0.5 +ddp@1.1.0 +deps@1.0.7 +ejson@1.0.6 +geojson-utils@1.0.3 +http@1.1.0 +id-map@1.0.3 +json@1.0.3 +livedata@1.0.13 +logging@1.0.7 +meteor@1.1.6 +minimongo@1.0.8 +mongo@1.1.0 +mongo-livedata@1.0.8 +ordered-dict@1.0.3 +raix:eventemitter@0.1.1 +random@1.0.3 +retry@1.0.3 +tracker@1.0.7 +underscore@1.0.3 +url@1.0.4 diff --git a/packages/tempstore/CHANGELOG.md b/packages/tempstore/CHANGELOG.md new file mode 100644 index 00000000000..64b4f4394f6 --- /dev/null +++ b/packages/tempstore/CHANGELOG.md @@ -0,0 +1,169 @@ +# Changelog + +## vCurrent +## [v0.1.2] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.1.2) +#### 17/12/14 by Morten Henriksen +## [v0.1.1] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.1.1) +#### 17/12/14 by Morten Henriksen +- mbr update, remove versions.json + +- Bump to version 0.1.1 + +## [v0.1.0] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.1.0) +#### 17/12/14 by Morten Henriksen +- mbr update versions and fix warnings + +- fix 0.9.1 package scope + +- don't rely on package names; fix for 0.9.1 + +- 0.9.1 support + +## [v0.0.29] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.29) +#### 28/08/14 by Morten Henriksen +- Meteor Package System Update + +## [v0.0.28] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.28) +#### 27/08/14 by Eric Dobbertin +- change package name to lowercase + +## [v0.0.27] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.27) +#### 17/06/14 by Eric Dobbertin +- add `FS.TempStore.removeAll` method + +## [v0.0.26] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.26) +#### 30/04/14 by Eric Dobbertin +## [v0.0.25] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.25) +#### 30/04/14 by Eric Dobbertin +- use third-party combined-stream node pkg as attempt to resolve pesky streaming issues + +## [v0.0.24] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.24) +#### 29/04/14 by Eric Dobbertin +- generate api docs + +- fileKey methods now expect an FS.File always, so we give them one + +- small FS.File API change + +## [v0.0.23] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.23) +#### 12/04/14 by Eric Dobbertin +- test for packages since we're assigning default error functions for stores + +## [v0.0.22] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.22) +#### 12/04/14 by Eric Dobbertin +- avoid errors if file already removed from temp store + +## [v0.0.21] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.21) +#### 12/04/14 by Eric Dobbertin +## [v0.0.20] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.20) +#### 08/04/14 by Eric Dobbertin +- cleanup stored/uploaded events and further improve chunk tracking + +## [v0.0.19] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.19) +#### 08/04/14 by Eric Dobbertin +- use internal tracking collection + +- Have TempStore set the size + +- Add the SA on stored result + +- allow unset chunkSum + +## [v0.0.18] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.18) +#### 06/04/14 by Eric Dobbertin +- delete chunkCount and chunkSize props from fileObj after upload is complete + +## [v0.0.17] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.17) +#### 06/04/14 by Eric Dobbertin +- We now wait to mount storage until it's needed (first upload begins); this ensures that we are able to accurately check for the cfs-worker package, which loads after this one. It also makes the code a bit cleaner. + +## [v0.0.16] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.16) +#### 04/04/14 by Morten Henriksen +- Temporary workaround: We currently we generate a mongoId if gridFS is used for TempStore + +- Note: At the moment tempStore will only use gridfs if no filesystem is installed + +## [v0.0.15] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.15) +#### 02/04/14 by Morten Henriksen +- Use the stored event and object instead (result object is not used at the moment - but we could store an id at some point) + +## [v0.0.14] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.14) +#### 31/03/14 by Eric Dobbertin +- use latest releases + +- use latest releases + +## [v0.0.13] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.13) +#### 31/03/14 by Morten Henriksen +- Try to use latest when using weak deps + +## [v0.0.12] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.12) +#### 30/03/14 by Morten Henriksen +## [v0.0.11] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.11) +#### 30/03/14 by Morten Henriksen +- Set noon callback - we just want the file gone + +## [v0.0.10] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.10) +#### 29/03/14 by Morten Henriksen +- add filesystem and gridfs as weak deps + +## [v0.0.9] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.9) +#### 29/03/14 by Morten Henriksen +- Add check to see if FS.TempStore.Storage is set + +## [v0.0.8] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.8) +#### 29/03/14 by Morten Henriksen +- Converting TempStore to use SA api + +## [v0.0.7] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.7) +#### 25/03/14 by Morten Henriksen +- use `new Date` + +- Have TempStore emit relevant events + +## [v0.0.6] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.6) +#### 23/03/14 by Morten Henriksen +- Rollback to specific git dependency + +- use collectionFS travis version force update + +## [v0.0.5] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.5) +#### 22/03/14 by Morten Henriksen +- try to fix travis test by using general package references + +## [v0.0.4] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.4) +#### 21/03/14 by Morten Henriksen +- fix chunk files not actually being deleted + +## [v0.0.3] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.3) +#### 18/03/14 by Morten Henriksen +- * TempStore is now an EventEmitter * progress event * uploaded * (start) should perhaps be created * remove * Added FS.TempStore.listParts - will return lookup object listing the parts already uploaded + +- Allow chunk to be undefined an thereby have the createWriteStream follow normal streaming api + +- Allow undefined in chunkPath + +- added comments + +- bug hunting + +- Add streaming WIP + +- rename temp store collection to 'cfs.tempstore' + +- fix ensureForFile + +- track tempstore chunks in our own collection rather than in the file object + +- change to accept buffer; less converting + +- prevent bytesUploaded from getting bigger than size + +## [v0.0.2] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.2) +#### 15/02/14 by Morten Henriksen +- fix typo + +## [v0.0.1] (https://github.com/CollectionFS/Meteor-cfs-tempstore/tree/v0.0.1) +#### 13/02/14 by Morten Henriksen +- init commit + diff --git a/packages/tempstore/LICENSE.md b/packages/tempstore/LICENSE.md new file mode 100644 index 00000000000..1a3820821f4 --- /dev/null +++ b/packages/tempstore/LICENSE.md @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 [@raix](https://github.com/raix) and [@aldeed](https://github.com/aldeed), aka Morten N.O. Nørgaard Henriksen, mh@gi-software.com + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/tempstore/README.md b/packages/tempstore/README.md new file mode 100644 index 00000000000..62ae16e78f7 --- /dev/null +++ b/packages/tempstore/README.md @@ -0,0 +1,24 @@ +cfs:tempstore +========================= + +This is a Meteor package used by +[CollectionFS](https://github.com/CollectionFS/Meteor-CollectionFS). It provides +an API for quickly storing chunks of file data in temporary files. If also supports deleting those chunks, and combining them into one +binary object and attaching it to an FS.File instance. + +You don't need to manually add this package to your app, but you could replace +this package with your own if you want to handle temporary storage in another +way. + +> `FS.TempStore` uses the `cfs:storage-adapter` compatible Storage Adapters, both `FS.Store.FileSystem` and `FS.Store.GridFS` will be defaulted. *for more information read the [internal.api.md](internal.api.md)* + +##Documentation +[API Documentation](api.md) + +##Contribute +Here's the [complete API documentation](internal.api.md), including private methods. + +Update docs, `npm install docmeteor` +```bash +$ docmeteor +``` \ No newline at end of file diff --git a/packages/tempstore/api.md b/packages/tempstore/api.md new file mode 100644 index 00000000000..cbb34632e6b --- /dev/null +++ b/packages/tempstore/api.md @@ -0,0 +1,112 @@ +## cfs-tempstore Public API ## + +CollectionFS, temporary storage + +_API documentation automatically generated by [docmeteor](https://github.com/raix/docmeteor)._ + +##Temporary Storage + +Temporary storage is used for chunked uploads until all chunks are received +and all copies have been made or given up. In some cases, the original file +is stored only in temporary storage (for example, if all copies do some +manipulation in beforeSave). This is why we use the temporary file as the +basis for each saved copy, and then remove it after all copies are saved. + +Every chunk is saved as an individual temporary file. This is safer than +attempting to write multiple incoming chunks to different positions in a +single temporary file, which can lead to write conflicts. + +Using temp files also allows us to easily resume uploads, even if the server +restarts, and to keep the working memory clear. +The FS.TempStore emits events that others are able to listen to +- + +### *fs*.TempStore {object}  Server ### + +*This property __TempStore__ is defined in `FS`* +it's an event emitter* + +> ```FS.TempStore = new EventEmitter();``` [tempStore.js:28](tempStore.js#L28) + + + +- +We will not mount a storage adapter until needed. This allows us to check for the +existance of FS.FileWorker, which is loaded after this package because it +depends on this package. + +- +XXX: TODO +FS.TempStore.on('stored', function(fileObj, chunkCount, result) { +This should work if we pass on result from the SA on stored event... +fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } }); +}); +Stream implementation +- + +### *fsTempstore*.removeFile(fileObj)  Server ### + +*This method __removeFile__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + +This function removes the file from tempstorage - it cares not if file is +already removed or not found, goal is reached anyway. + +> ```FS.TempStore.removeFile = function(fileObj) { ...``` [tempStore.js:169](tempStore.js#L169) + + +- + +### *fsTempstore*.createWriteStream(fileObj, [options])  Server ### + +*This method __createWriteStream__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + + File to store in temporary storage + +* __options__ *{[Number ](#Number )|[ String](# String)}* (Optional) + +__Returns__ *{Stream}* +Writeable stream + + +`options` of different types mean differnt things: +`undefined` We store the file in one part +(Normal server-side api usage)* +`Number` the number is the part number total +(multipart uploads will use this api)* +`String` the string is the name of the `store` that wants to store file data +(stores that want to sync their data to the rest of the files stores will use this)* + +> Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise + +> ```FS.TempStore.createWriteStream = function(fileObj, options) { ...``` [tempStore.js:217](tempStore.js#L217) + + +- + +### *fsTempstore*.createReadStream(fileObj)  Server ### + +*This method __createReadStream__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + + The file to read + + +__Returns__ *{Stream}* +Returns readable stream + + + +> ```FS.TempStore.createReadStream = function(fileObj) { ...``` [tempStore.js:313](tempStore.js#L313) + + diff --git a/packages/tempstore/internal.api.md b/packages/tempstore/internal.api.md new file mode 100644 index 00000000000..4b6e1f6d5ee --- /dev/null +++ b/packages/tempstore/internal.api.md @@ -0,0 +1,225 @@ +## Public and Private API ## + +_API documentation automatically generated by [docmeteor](https://github.com/raix/docmeteor)._ + +*** + +__File: ["tempStore.js"](tempStore.js) Where: {server}__ + +*** + +##Temporary Storage + +Temporary storage is used for chunked uploads until all chunks are received +and all copies have been made or given up. In some cases, the original file +is stored only in temporary storage (for example, if all copies do some +manipulation in beforeSave). This is why we use the temporary file as the +basis for each saved copy, and then remove it after all copies are saved. + +Every chunk is saved as an individual temporary file. This is safer than +attempting to write multiple incoming chunks to different positions in a +single temporary file, which can lead to write conflicts. + +Using temp files also allows us to easily resume uploads, even if the server +restarts, and to keep the working memory clear. +The FS.TempStore emits events that others are able to listen to +- + +### *fs*.TempStore {object}  Server ### + +*This property __TempStore__ is defined in `FS`* +it's an event emitter* + +> ```FS.TempStore = new EventEmitter();``` [tempStore.js:28](tempStore.js#L28) + + +- + +### *fsTempstore*.Storage {StorageAdapter}  Server ### + +*This property is private* +*This property __Storage__ is defined in `FS.TempStore`* + +This property is set to either `FS.Store.FileSystem` or `FS.Store.GridFS` + +__When and why:__ +We normally default to `cfs-filesystem` unless its not installed. *(we default to gridfs if installed)* +But if `cfs-gridfs` and `cfs-worker` is installed we default to `cfs-gridfs` + +If `cfs-gridfs` and `cfs-filesystem` is not installed we log a warning. +the user can set `FS.TempStore.Storage` them selfs eg.: +```js +// Its important to set `internal: true` this lets the SA know that we +// are using this internally and it will give us direct SA api +FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true }); +``` + +> Note: This is considered as `advanced` use, its not a common pattern. + +> ```FS.TempStore.Storage = null;``` [tempStore.js:54](tempStore.js#L54) + + + +- +We will not mount a storage adapter until needed. This allows us to check for the +existance of FS.FileWorker, which is loaded after this package because it +depends on this package. + +- +XXX: TODO +FS.TempStore.on('stored', function(fileObj, chunkCount, result) { +This should work if we pass on result from the SA on stored event... +fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } }); +}); +Stream implementation +- + +### _chunkPath([n])  Server ### + +*This method is private* + +__Arguments__ + +* __n__ *{Number}* (Optional) + + Chunk number + + +__Returns__ *{String}* +Chunk naming convention + + +> ```_chunkPath = function(n) { ...``` [tempStore.js:104](tempStore.js#L104) + + +- + +### _fileReference(fileObj, chunk)  Server ### + +*This method is private* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* +* __chunk__ *{Number}* + +__Returns__ *{String}* +Generated SA specific fileKey for the chunk + + +Note: Calling function should call mountStorage() first, and +make sure that fileObj is mounted. + +> ```_fileReference = function(fileObj, chunk, existing) { ...``` [tempStore.js:118](tempStore.js#L118) + + +- + +### *fsTempstore*.exists(File)  Server ### + +*This method __exists__ is defined in `FS.TempStore`* + +__Arguments__ + +* __File__ *{[FS.File](#FS.File)}* + + object + + +__Returns__ *{Boolean}* +Is this file, or parts of it, currently stored in the TempStore + + +> ```FS.TempStore.exists = function(fileObj) { ...``` [tempStore.js:145](tempStore.js#L145) + + +- + +### *fsTempstore*.listParts(fileObj)  Server ### + +*This method __listParts__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + +__Returns__ *{Object}* +of parts already stored + +__TODO__ +``` +* This is not yet implemented, milestone 1.1.0 +``` + + +> ```FS.TempStore.listParts = function(fileObj) { ...``` [tempStore.js:156](tempStore.js#L156) + + +- + +### *fsTempstore*.removeFile(fileObj)  Server ### + +*This method __removeFile__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + +This function removes the file from tempstorage - it cares not if file is +already removed or not found, goal is reached anyway. + +> ```FS.TempStore.removeFile = function(fileObj) { ...``` [tempStore.js:169](tempStore.js#L169) + + +- + +### *fsTempstore*.createWriteStream(fileObj, [options])  Server ### + +*This method __createWriteStream__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + + File to store in temporary storage + +* __options__ *{[Number ](#Number )|[ String](# String)}* (Optional) + +__Returns__ *{Stream}* +Writeable stream + + +`options` of different types mean differnt things: +`undefined` We store the file in one part +(Normal server-side api usage)* +`Number` the number is the part number total +(multipart uploads will use this api)* +`String` the string is the name of the `store` that wants to store file data +(stores that want to sync their data to the rest of the files stores will use this)* + +> Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise + +> ```FS.TempStore.createWriteStream = function(fileObj, options) { ...``` [tempStore.js:217](tempStore.js#L217) + + +- + +### *fsTempstore*.createReadStream(fileObj)  Server ### + +*This method __createReadStream__ is defined in `FS.TempStore`* + +__Arguments__ + +* __fileObj__ *{[FS.File](#FS.File)}* + + The file to read + + +__Returns__ *{Stream}* +Returns readable stream + + + +> ```FS.TempStore.createReadStream = function(fileObj) { ...``` [tempStore.js:313](tempStore.js#L313) + + diff --git a/packages/tempstore/package.js b/packages/tempstore/package.js new file mode 100644 index 00000000000..f339ef51a00 --- /dev/null +++ b/packages/tempstore/package.js @@ -0,0 +1,34 @@ + Package.describe({ + git: 'https://github.com/CollectionFS/Meteor-cfs-tempstore.git', + name: 'cfs:tempstore', + version: '0.1.5', + summary: 'CollectionFS, temporary storage' +}); + +Npm.depends({ + 'combined-stream': '0.0.4' +}); + +Package.onUse(function(api) { + api.versionsFrom('1.0'); + + api.use(['cfs:base-package@0.0.30', 'cfs:file@0.1.16', "ecmascript"]); + + api.use('cfs:filesystem@0.1.2', { weak: true }); + api.use('cfs:gridfs@0.0.30', { weak: true }); + + api.use('mongo'); + + api.addFiles([ + 'tempStore.js' + ], 'server'); +}); + +// Package.on_test(function (api) { +// api.use('collectionfs'); +// api.use('test-helpers', 'server'); +// api.use(['tinytest', 'underscore', 'ejson', 'ordered-dict', +// 'random', 'deps']); + +// api.addFiles('tests/server-tests.js', 'server'); +// }); diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js new file mode 100644 index 00000000000..b372dd0580d --- /dev/null +++ b/packages/tempstore/tempStore.js @@ -0,0 +1,399 @@ +// ##Temporary Storage +// +// Temporary storage is used for chunked uploads until all chunks are received +// and all copies have been made or given up. In some cases, the original file +// is stored only in temporary storage (for example, if all copies do some +// manipulation in beforeSave). This is why we use the temporary file as the +// basis for each saved copy, and then remove it after all copies are saved. +// +// Every chunk is saved as an individual temporary file. This is safer than +// attempting to write multiple incoming chunks to different positions in a +// single temporary file, which can lead to write conflicts. +// +// Using temp files also allows us to easily resume uploads, even if the server +// restarts, and to keep the working memory clear. + +// The FS.TempStore emits events that others are able to listen to +var EventEmitter = Npm.require('events').EventEmitter; + +// We have a special stream concating all chunk files into one readable stream +var CombinedStream = Npm.require('combined-stream'); + +/** @namespace FS.TempStore + * @property FS.TempStore + * @type {object} + * @public + * *it's an event emitter* + */ +FS.TempStore = new EventEmitter(); + +// Create a tracker collection for keeping track of all chunks for any files that are currently in the temp store +var tracker = FS.TempStore.Tracker = new Mongo.Collection('cfs._tempstore.chunks'); + +/** + * @property FS.TempStore.Storage + * @type {StorageAdapter} + * @namespace FS.TempStore + * @private + * This property is set to either `FS.Store.FileSystem` or `FS.Store.GridFS` + * + * __When and why:__ + * We normally default to `cfs-filesystem` unless its not installed. *(we default to gridfs if installed)* + * But if `cfs-gridfs` and `cfs-worker` is installed we default to `cfs-gridfs` + * + * If `cfs-gridfs` and `cfs-filesystem` is not installed we log a warning. + * the user can set `FS.TempStore.Storage` them selfs eg.: + * ```js + * // Its important to set `internal: true` this lets the SA know that we + * // are using this internally and it will give us direct SA api + * FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true }); + * ``` + * + * > Note: This is considered as `advanced` use, its not a common pattern. + */ +FS.TempStore.Storage = null; + +// We will not mount a storage adapter until needed. This allows us to check for the +// existance of FS.FileWorker, which is loaded after this package because it +// depends on this package. +function mountStorage() { + + if (FS.TempStore.Storage) return; + + // XXX: We could replace this test, testing the FS scope for grifFS etc. + // This is on the todo later when we get "stable" + if (Package["cfs:gridfs"] && (Package["cfs:worker"] || !Package["cfs:filesystem"])) { + // If the file worker is installed we would prefer to use the gridfs sa + // for scalability. We also default to gridfs if filesystem is not found + + // Use the gridfs + FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true }); + } else if (Package["cfs:filesystem"]) { + + // use the Filesystem + FS.TempStore.Storage = new FS.Store.FileSystem('_tempstore', { internal: true }); + } else { + throw new Error('FS.TempStore.Storage is not set: Install cfs:filesystem or cfs:gridfs or set it manually'); + } + + FS.debug && console.log('TempStore is mounted on', FS.TempStore.Storage.typeName); +} + +function mountFile(fileObj, name) { + if (!fileObj.isMounted()) { + throw new Error(name + ' cannot work with unmounted file'); + } +} + +// We update the fileObj on progress +FS.TempStore.on('progress', function(fileObj, chunkNum, count, total, result) { + FS.debug && console.log('TempStore progress: Received ' + count + ' of ' + total + ' chunks for ' + fileObj.name()); +}); + +// XXX: TODO +// FS.TempStore.on('stored', function(fileObj, chunkCount, result) { +// // This should work if we pass on result from the SA on stored event... +// fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } }); +// }); + +// Stream implementation + +/** + * @method _chunkPath + * @private + * @param {Number} [n] Chunk number + * @returns {String} Chunk naming convention + */ +_chunkPath = function(n) { + return (n || 0) + '.chunk'; +}; + +/** + * @method _fileReference + * @param {FS.File} fileObj + * @param {Number} chunk + * @private + * @returns {String} Generated SA specific fileKey for the chunk + * + * Note: Calling function should call mountStorage() first, and + * make sure that fileObj is mounted. + */ +_fileReference = function(fileObj, chunk, existing) { + // Maybe it's a chunk we've already saved + existing = existing || tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}); + + // Make a temporary fileObj just for fileKey generation + var tempFileObj = new FS.File({ + collectionName: fileObj.collectionName, + _id: fileObj._id, + original: { + name: _chunkPath(chunk) + }, + copies: { + _tempstore: { + key: existing && existing.keys[chunk] + } + } + }); + + // Return a fitting fileKey SA specific + return FS.TempStore.Storage.adapter.fileKey(tempFileObj); +}; + +/** + * @method FS.TempStore.exists + * @param {FS.File} File object + * @returns {Boolean} Is this file, or parts of it, currently stored in the TempStore + */ +FS.TempStore.exists = function(fileObj) { + var existing = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}); + return !!existing; +}; + +/** + * @method FS.TempStore.listParts + * @param {FS.File} fileObj + * @returns {Object} of parts already stored + * @todo This is not yet implemented, milestone 1.1.0 + */ +FS.TempStore.listParts = function fsTempStoreListParts(fileObj) { + var self = this; + console.warn('This function is not correctly implemented using SA in TempStore'); + //XXX This function might be necessary for resume. Not currently supported. +}; + +/** + * @method FS.TempStore.removeFile + * @public + * @param {FS.File} fileObj + * This function removes the file from tempstorage - it cares not if file is + * already removed or not found, goal is reached anyway. + */ +FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) { + var self = this; + + // Ensure that we have a storage adapter mounted; if not, throw an error. + mountStorage(); + + // If fileObj is not mounted or can't be, throw an error + mountFile(fileObj, 'FS.TempStore.removeFile'); + + // Emit event + self.emit('remove', fileObj); + + var chunkInfo = tracker.findOne({ + fileId: fileObj._id, + collectionName: fileObj.collectionName + }); + + if (chunkInfo) { + + // Unlink each file + FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) { + var fileKey = _fileReference(fileObj, chunk, chunkInfo); + FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop); + }); + + // Remove fileObj from tracker collection, too + tracker.remove({_id: chunkInfo._id}); + + } +}; + +/** + * @method FS.TempStore.removeAll + * @public + * This function removes all files from tempstorage - it cares not if file is + * already removed or not found, goal is reached anyway. + */ +FS.TempStore.removeAll = function fsTempStoreRemoveAll() { + var self = this; + + // Ensure that we have a storage adapter mounted; if not, throw an error. + mountStorage(); + + tracker.find().forEach(function (chunkInfo) { + // Unlink each file + FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) { + var fileKey = _fileReference({_id: chunkInfo.fileId, collectionName: chunkInfo.collectionName}, chunk, chunkInfo); + FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop); + }); + + // Remove from tracker collection, too + tracker.remove({_id: chunkInfo._id}); + }); +}; + +/** + * @method FS.TempStore.createWriteStream + * @public + * @param {FS.File} fileObj File to store in temporary storage + * @param {Number | String} [options] + * @returns {Stream} Writeable stream + * + * `options` of different types mean differnt things: + * * `undefined` We store the file in one part + * *(Normal server-side api usage)* + * * `Number` the number is the part number total + * *(multipart uploads will use this api)* + * * `String` the string is the name of the `store` that wants to store file data + * *(stores that want to sync their data to the rest of the files stores will use this)* + * + * > Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise + */ +FS.TempStore.createWriteStream = function(fileObj, options) { + var self = this; + + // Ensure that we have a storage adapter mounted; if not, throw an error. + mountStorage(); + + // If fileObj is not mounted or can't be, throw an error + mountFile(fileObj, 'FS.TempStore.createWriteStream'); + + // Cache the selector for use multiple times below + var selector = {fileId: fileObj._id, collectionName: fileObj.collectionName}; + + // TODO, should pass in chunkSum so we don't need to use FS.File for it + var chunkSum = fileObj.chunkSum || 1; + + // Add fileObj to tracker collection + tracker.upsert(selector, {$setOnInsert: {keys: {}}}); + + // Determine how we're using the writeStream + var isOnePart = false, isMultiPart = false, isStoreSync = false, chunkNum = 0; + if (options === +options) { + isMultiPart = true; + chunkNum = options; + } else if (options === ''+options) { + isStoreSync = true; + } else { + isOnePart = true; + } + + // XXX: it should be possible for a store to sync by storing data into the + // tempstore - this could be done nicely by setting the store name as string + // in the chunk variable? + // This store name could be passed on the the fileworker via the uploaded + // event + // So the uploaded event can return: + // undefined - if data is stored into and should sync out to all storage adapters + // number - if a chunk has been uploaded + // string - if a storage adapter wants to sync its data to the other SA's + + // Find a nice location for the chunk data + var fileKey = _fileReference(fileObj, chunkNum); + + // Create the stream as Meteor safe stream + var writeStream = FS.TempStore.Storage.adapter.createWriteStream(fileKey); + + // When the stream closes we update the chunkCount + writeStream.safeOn('stored', function(result) { + // Save key in tracker document + var setObj = {}; + setObj['keys.' + chunkNum] = result.fileKey; + tracker.update(selector, {$set: setObj}); + + + var temp = tracker.findOne(selector); + + if(!temp){ + FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)'); + return; + } + + // Get updated chunkCount + var chunkCount = FS.Utility.size(temp.keys); + + // Progress + self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result); + + // If upload is completed + if (chunkCount === chunkSum) { + // We no longer need the chunk info + var modifier = { $set: {}, $unset: {chunkCount: 1, chunkSum: 1, chunkSize: 1} }; + + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + + // Check if the file has been uploaded before + if (typeof fileObj.uploadedAt === 'undefined') { + // We set the uploadedAt date + modifier.$set.uploadedAt = new Date(); + } else { + // We have been uploaded so an event were file data is updated is + // called synchronizing - so this must be a synchronizedAt? + modifier.$set.synchronizedAt = new Date(); + } + + // Update the fileObject + fileObj.update(modifier); + + // Fire ending events + var eventName = isStoreSync ? 'synchronized' : 'stored'; + self.emit(eventName, fileObj, result); + + // XXX is emitting "ready" necessary? + self.emit('ready', fileObj, chunkCount, result); + } else { + + var modifier = { $set: {}}; + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + + modifier.$set.chunkCount = chunkCount; + + fileObj.update(modifier); + } + }); + + // Emit errors + writeStream.on('error', function (error) { + FS.debug && console.log('TempStore writeStream error:', error); + self.emit('error', error, fileObj); + }); + + return writeStream; +}; + +/** + * @method FS.TempStore.createReadStream + * @public + * @param {FS.File} fileObj The file to read + * @return {Stream} Returns readable stream + * + */ +FS.TempStore.createReadStream = function(fileObj) { + // Ensure that we have a storage adapter mounted; if not, throw an error. + mountStorage(); + + // If fileObj is not mounted or can't be, throw an error + mountFile(fileObj, 'FS.TempStore.createReadStream'); + + FS.debug && console.log('FS.TempStore creating read stream for ' + fileObj._id); + + // Determine how many total chunks there are from the tracker collection + var chunkInfo = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}) || {}; + var totalChunks = FS.Utility.size(chunkInfo.keys); + + function getNextStreamFunc(chunk) { + return Meteor.bindEnvironment(function(next) { + var fileKey = _fileReference(fileObj, chunk); + var chunkReadStream = FS.TempStore.Storage.adapter.createReadStream(fileKey); + next(chunkReadStream); + }, function (error) { + throw error; + }); + } + + // Make a combined stream + var combinedStream = CombinedStream.create(); + + // Add each chunk stream to the combined stream when the previous chunk stream ends + var currentChunk = 0; + for (var chunk = 0; chunk < totalChunks; chunk++) { + combinedStream.append(getNextStreamFunc(chunk)); + } + + // Return the combined stream + return combinedStream; +}; diff --git a/packages/tempstore/tests/server-tests.js b/packages/tempstore/tests/server-tests.js new file mode 100644 index 00000000000..242aa89ca3a --- /dev/null +++ b/packages/tempstore/tests/server-tests.js @@ -0,0 +1,39 @@ +function equals(a, b) { + return !!(EJSON.stringify(a) === EJSON.stringify(b)); +} + +Tinytest.add('cfs-tempstore - server - test environment', function(test) { + test.isTrue(typeof FS.Collection !== 'undefined', 'test environment not initialized FS.Collection'); +}); + +/* + * This is a server-only package so only server tests are needed. + * Need to test each API method: + * FS.TempStore.saveChunk + * FS.TempStore.getDataForFile + * FS.TempStore.getDataForFileSync + * FS.TempStore.deleteChunks + * FS.TempStore.ensureForFile + * + */ + + +//Test API: +//test.isFalse(v, msg) +//test.isTrue(v, msg) +//test.equalactual, expected, message, not +//test.length(obj, len) +//test.include(s, v) +//test.isNaN(v, msg) +//test.isUndefined(v, msg) +//test.isNotNull +//test.isNull +//test.throws(func) +//test.instanceOf(obj, klass) +//test.notEqual(actual, expected, message) +//test.runId() +//test.exception(exception) +//test.expect_fail() +//test.ok(doc) +//test.fail(doc) +//test.equal(a, b, msg) diff --git a/packages/worker/.travis.yml b/packages/worker/.travis.yml new file mode 100644 index 00000000000..6a464003387 --- /dev/null +++ b/packages/worker/.travis.yml @@ -0,0 +1,5 @@ +language: node_js +node_js: + - "0.10" +before_install: + - "curl -L http://git.io/s0Zu-w | /bin/sh" \ No newline at end of file diff --git a/packages/worker/.versions b/packages/worker/.versions new file mode 100644 index 00000000000..3d899f64021 --- /dev/null +++ b/packages/worker/.versions @@ -0,0 +1,32 @@ +base64@1.0.3 +binary-heap@1.0.3 +callback-hook@1.0.3 +cfs:base-package@0.0.30 +cfs:data-man@0.0.6 +cfs:file@0.1.16 +cfs:power-queue@0.9.11 +cfs:reactive-property@0.0.4 +cfs:storage-adapter@0.2.1 +cfs:tempstore@0.1.4 +cfs:worker@0.1.4 +check@1.0.5 +ddp@1.1.0 +deps@1.0.7 +ejson@1.0.6 +geojson-utils@1.0.3 +http@1.1.0 +id-map@1.0.3 +json@1.0.3 +livedata@1.0.13 +logging@1.0.7 +meteor@1.1.6 +minimongo@1.0.8 +mongo@1.1.0 +mongo-livedata@1.0.8 +ordered-dict@1.0.3 +raix:eventemitter@0.1.1 +random@1.0.3 +retry@1.0.3 +tracker@1.0.7 +underscore@1.0.3 +url@1.0.4 diff --git a/packages/worker/CHANGELOG.md b/packages/worker/CHANGELOG.md new file mode 100644 index 00000000000..3899e6848cb --- /dev/null +++ b/packages/worker/CHANGELOG.md @@ -0,0 +1,123 @@ +# Changelog + +## vCurrent +## [v0.1.2] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.1.2) +#### 17/12/14 by Morten Henriksen +## [v0.1.1] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.1.1) +#### 17/12/14 by Morten Henriksen +- Bump to version 0.1.1 + +- mbr update, remove versions.json + +## [v0.1.0] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.1.0) +#### 17/12/14 by Morten Henriksen +- mbr update versions and fix warnings + +- *Merged pull-request:* "Remove the unused function makeSafeCallback()." [#4](https://github.com/CollectionFS/Meteor-cfs-worker/issues/4) ([DouglasUrner](https://github.com/DouglasUrner)) + +- Remove the unused function makeSafeCallback(). + +- *Merged pull-request:* "Minor formatting edit." [#2](https://github.com/CollectionFS/Meteor-cfs-worker/issues/2) ([DouglasUrner](https://github.com/DouglasUrner)) + +- Minor formatting edit. + +- 0.9.1 support + +Patches by GitHub user [@DouglasUrner](https://github.com/DouglasUrner). + +## [v0.0.20] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.20) +#### 28/08/14 by Morten Henriksen +- Meteor Package System Update + +## [v0.0.19] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.19) +#### 27/08/14 by Eric Dobbertin +## [v0.0.18] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.18) +#### 27/08/14 by Eric Dobbertin +- Merge branch 'master' of https://github.com/CollectionFS/Meteor-cfs-worker + +- change package name to lowercase + +## [v0.0.17] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.17) +#### 09/08/14 by Eric Dobbertin +- *Merged pull-request:* "Fixed bug preventing temp chunks deletion" [#1](https://github.com/CollectionFS/Meteor-cfs-worker/issues/1) ([GuillaumeZuff](https://github.com/GuillaumeZuff)) + +- Fixed bug preventing temp chunks deletion + +Patches by GitHub user [@GuillaumeZuff](https://github.com/GuillaumeZuff). + +## [v0.0.16] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.16) +#### 06/04/14 by Eric Dobbertin +- use uploadedAt so that we can remove chunk info when it's no longer needed + +## [v0.0.15] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.15) +#### 05/04/14 by Morten Henriksen +## [v0.0.14] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.14) +#### 31/03/14 by Eric Dobbertin +- use latest releases + +## [v0.0.13] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.13) +#### 29/03/14 by Morten Henriksen +- remove underscore deps + +## [v0.0.12] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.12) +#### 29/03/14 by Morten Henriksen +- Refactoring and clean ups + +## [v0.0.11] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.11) +#### 23/03/14 by Morten Henriksen +- Rollback to specific git dependency + +- use collectionFS travis version force update + +## [v0.0.10] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.10) +#### 22/03/14 by Morten Henriksen +- try to fix travis test by using general package references + +## [v0.0.9] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.9) +#### 22/03/14 by Morten Henriksen +- out factor fileobj update when file is stored + +- clean up and use the correct end event for streams + +- reference released collectionFS pkg + +## [v0.0.8] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.8) +#### 21/03/14 by Morten Henriksen +## [v0.0.7] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.7) +#### 18/03/14 by Eric Dobbertin +- Remove from temp store when removed from collection + +## [v0.0.6] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.6) +#### 18/03/14 by Morten Henriksen +- fix refactor name for removeFile + +- add back code for running beforeSave function, with stream support (just getting it working for now, can be switched to transform streams later) + +- changed method name + +- minor changes using chunks in file record + +- Add streaming WIP + +## [v0.0.5] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.5) +#### 07/03/14 by Eric Dobbertin +- should be installing devel + +- small change because tempstore no longer tracks chunks in the file object + +## [v0.0.4] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.4) +#### 03/03/14 by Eric Dobbertin +- moved beforeSave out of SA and into here + +- move saveCopy here, out of fs.collection + +- just package and doc tweaks + +## [v0.0.3] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.3) +#### 15/02/14 by Morten Henriksen +## [v0.0.2] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.2) +#### 13/02/14 by Morten Henriksen +## [v0.0.1] (https://github.com/CollectionFS/Meteor-cfs-worker/tree/v0.0.1) +#### 13/02/14 by Morten Henriksen +- init commit + diff --git a/packages/worker/LICENSE.md b/packages/worker/LICENSE.md new file mode 100644 index 00000000000..1a3820821f4 --- /dev/null +++ b/packages/worker/LICENSE.md @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 [@raix](https://github.com/raix) and [@aldeed](https://github.com/aldeed), aka Morten N.O. Nørgaard Henriksen, mh@gi-software.com + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/worker/README.md b/packages/worker/README.md new file mode 100644 index 00000000000..b21022b95e6 --- /dev/null +++ b/packages/worker/README.md @@ -0,0 +1,8 @@ +cfs:worker +========================= + +This is a Meteor package used by +[CollectionFS](https://github.com/CollectionFS/Meteor-CollectionFS). + +You don't need to manually add this package to your app. It is added when you +add the `cfs:standard-packages` package. \ No newline at end of file diff --git a/packages/worker/api.md b/packages/worker/api.md new file mode 100644 index 00000000000..cab1605d50a --- /dev/null +++ b/packages/worker/api.md @@ -0,0 +1,38 @@ +## cfs-worker Public API ## + +CollectionFS, file worker - handles file copies/versions + +_API documentation automatically generated by [docmeteor](https://github.com/raix/docmeteor)._ + +TODO: Use power queue to handle throttling etc. +Use observe to monitor changes and have it create tasks for the power queue +to perform. +- + +### *fs*.FileWorker Object  Server ### + +*This property __FileWorker__ is defined in `FS`* + + +> ```FS.FileWorker = { ...``` [fileWorker.js:9](fileWorker.js#L9) + + +- + +### *fsFileworker*.observe(fsCollection)  Server ### + +*This method __observe__ is defined in `FS.FileWorker`* + +__Arguments__ + +* __fsCollection__ *{[FS.Collection](#FS.Collection)}* + +__Returns__ *{undefined}* + + +Sets up observes on the fsCollection to store file copies and delete +temp files at the appropriate times. + +> ```FS.FileWorker.observe = function(fsCollection) { ...``` [fileWorker.js:20](fileWorker.js#L20) + + diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js new file mode 100644 index 00000000000..7a150243d84 --- /dev/null +++ b/packages/worker/fileWorker.js @@ -0,0 +1,186 @@ +//// TODO: Use power queue to handle throttling etc. +//// Use observe to monitor changes and have it create tasks for the power queue +//// to perform. + +/** + * @public + * @type Object + */ +FS.FileWorker = {}; + +/** + * @method FS.FileWorker.observe + * @public + * @param {FS.Collection} fsCollection + * @returns {undefined} + * + * Sets up observes on the fsCollection to store file copies and delete + * temp files at the appropriate times. + */ +FS.FileWorker.observe = function(fsCollection) { + + // Initiate observe for finding newly uploaded/added files that need to be stored + // per store. + FS.Utility.each(fsCollection.options.stores, function(store) { + var storeName = store.name; + fsCollection.files.find(getReadyQuery(storeName), { + fields: { + copies: 0 + } + }).observe({ + added: function(fsFile) { + // added will catch fresh files + FS.debug && console.log("FileWorker ADDED - calling saveCopy", storeName, "for", fsFile._id); + saveCopy(fsFile, storeName); + }, + changed: function(fsFile) { + // changed will catch failures and retry them + FS.debug && console.log("FileWorker CHANGED - calling saveCopy", storeName, "for", fsFile._id); + saveCopy(fsFile, storeName); + } + }); + }); + + // Initiate observe for finding files that have been stored so we can delete + // any temp files + fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ + added: function(fsFile) { + FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); + try { + FS.TempStore.removeFile(fsFile); + }catch(err) { + console.log(err); + } + } + }); + + // Initiate observe for catching files that have been removed and + // removing the data from all stores as well + fsCollection.files.find().observe({ + removed: function(fsFile) { + FS.debug && console.log('FileWorker REMOVED - removing all stored data for', fsFile._id); + //remove from temp store + FS.TempStore.removeFile(fsFile); + //delete from all stores + FS.Utility.each(fsCollection.options.stores, function(storage) { + storage.adapter.remove(fsFile); + }); + } + }); +}; + +/** + * @method getReadyQuery + * @private + * @param {string} storeName - The name of the store to observe + * + * Returns a selector that will be used to identify files that + * have been uploaded but have not yet been stored to the + * specified store. + * + * { + * uploadedAt: {$exists: true}, + * 'copies.storeName`: null, + * 'failures.copies.storeName.doneTrying': {$ne: true} + * } + */ +function getReadyQuery(storeName) { + var selector = {uploadedAt: {$exists: true}}; + selector['copies.' + storeName] = null; + selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; + return selector; +} + +/** + * @method getDoneQuery + * @private + * @param {Array} stores - The stores array from the FS.Collection options + * + * Returns a selector that will be used to identify files where all + * stores have successfully save or have failed the + * max number of times but still have chunks. The resulting selector + * should be something like this: + * + * { + * $and: [ + * {chunks: {$exists: true}}, + * { + * $or: [ + * { + * $and: [ + * { + * 'copies.storeName': {$ne: null} + * }, + * { + * 'copies.storeName': {$ne: false} + * } + * ] + * }, + * { + * 'failures.copies.storeName.doneTrying': true + * } + * ] + * }, + * REPEATED FOR EACH STORE + * ] + * } + * + */ +function getDoneQuery(stores) { + var selector = { + $and: [{chunks: {$exists: true}}] + }; + + // Add conditions for all defined stores + FS.Utility.each(stores, function(store) { + var storeName = store.name; + var copyCond = {$or: [{$and: []}]}; + var tempCond = {}; + tempCond["copies." + storeName] = {$ne: null}; + copyCond.$or[0].$and.push(tempCond); + tempCond = {}; + tempCond["copies." + storeName] = {$ne: false}; + copyCond.$or[0].$and.push(tempCond); + tempCond = {}; + tempCond['failures.copies.' + storeName + '.doneTrying'] = true; + copyCond.$or.push(tempCond); + selector.$and.push(copyCond); + }) + + return selector; +} + +/** + * @method saveCopy + * @private + * @param {FS.File} fsFile + * @param {string} storeName + * @param {Object} options + * @param {Boolean} [options.overwrite=false] - Force save to the specified store? + * @returns {undefined} + * + * Saves to the specified store. If the + * `overwrite` option is `true`, will save to the store even if we already + * have, potentially overwriting any previously saved data. Synchronous. + */ +function saveCopy(fsFile, storeName, options) { + options = options || {}; + + var storage = FS.StorageAdapter(storeName); + if (!storage) { + throw new Error('No store named "' + storeName + '" exists'); + } + + FS.debug && console.log('saving to store ' + storeName); + + try { + + var writeStream = storage.adapter.createWriteStream(fsFile); + var readStream = FS.TempStore.createReadStream(fsFile); + + // Pipe the temp data into the storage adapter + readStream.pipe(writeStream); + }catch(err){ + console.log(err); + } +} diff --git a/packages/worker/internal.api.md b/packages/worker/internal.api.md new file mode 100644 index 00000000000..92cce610b69 --- /dev/null +++ b/packages/worker/internal.api.md @@ -0,0 +1,143 @@ +## Public and Private API ## + +_API documentation automatically generated by [docmeteor](https://github.com/raix/docmeteor)._ + +*** + +__File: ["fileWorker.js"](fileWorker.js) Where: {server}__ + +*** + +TODO: Use power queue to handle throttling etc. +Use observe to monitor changes and have it create tasks for the power queue +to perform. + +- + +### *fs*.FileWorker Object  Server ### + +*This property __FileWorker__ is defined in `FS`* + + +> ```FS.FileWorker = { ...``` [fileWorker.js:9](fileWorker.js#L9) + + +- + +### *fsFileworker*.observe(fsCollection)  Server ### + +*This method __observe__ is defined in `FS.FileWorker`* + +__Arguments__ + +* __fsCollection__ *{[FS.Collection](#FS.Collection)}* + +__Returns__ *{undefined}* + + +Sets up observes on the fsCollection to store file copies and delete +temp files at the appropriate times. + +> ```FS.FileWorker.observe = function(fsCollection) { ...``` [fileWorker.js:20](fileWorker.js#L20) + + +- + +### getReadyQuery(storeName)  undefined ### + +*This method is private* + +__Arguments__ + +* __storeName__ *{string}* + + The name of the store to observe + + + +Returns a selector that will be used to identify files that +have been uploaded but have not yet been stored to the +specified store. + +{ +$where: "this.chunkSum === this.chunkCount", +'copies.storeName`: null, +'failures.copies.storeName.doneTrying': {$ne: true} +} + +> ```function getReadyQuery(storeName) { ...``` [fileWorker.js:83](fileWorker.js#L83) + + +- + +### getDoneQuery(stores)  undefined ### + +*This method is private* + +__Arguments__ + +* __stores__ *{Object}* + + The stores object from the FS.Collection options + + + +Returns a selector that will be used to identify files where all +stores have successfully save or have failed the +max number of times but still have chunks. The resulting selector +should be something like this: + +{ +$and: [ +{chunks: {$exists: true}}, +{ +$or: [ +{ +$and: [ +{ +'copies.storeName': {$ne: null} +}, +{ +'copies.storeName': {$ne: false} +} +] +}, +{ +'failures.copies.storeName.doneTrying': true +} +] +}, +REPEATED FOR EACH STORE +] +} + + +> ```function getDoneQuery(stores) { ...``` [fileWorker.js:129](fileWorker.js#L129) + + +- + +### saveCopy(fsFile, storeName, options)  Server ### + +*This method is private* + +__Arguments__ + +* __fsFile__ *{[FS.File](#FS.File)}* +* __storeName__ *{string}* +* __options__ *{Object}* + * __overwrite__ *{Boolean}* (Optional, Default = false) + + Force save to the specified store? + + +__Returns__ *{undefined}* + + +Saves to the specified store. If the +`overwrite` option is `true`, will save to the store even if we already +have, potentially overwriting any previously saved data. Synchronous. + +> ```var makeSafeCallback = function (callback) { ...``` [fileWorker.js:168](fileWorker.js#L168) + + diff --git a/packages/worker/package.js b/packages/worker/package.js new file mode 100644 index 00000000000..435a9bcdffe --- /dev/null +++ b/packages/worker/package.js @@ -0,0 +1,36 @@ +Package.describe({ + git: 'https://github.com/CollectionFS/Meteor-cfs-worker.git', + name: 'cfs:worker', + version: '0.1.4', + summary: 'CollectionFS, file worker - handles file copies/versions' +}); + +Package.onUse(function(api) { + api.versionsFrom('1.0'); + + api.use([ + 'cfs:base-package@0.0.30', + 'cfs:tempstore@0.1.4', + 'cfs:storage-adapter@0.2.1' + ]); + + api.use([ + 'livedata', + 'mongo-livedata', + 'cfs:power-queue@0.9.11' + ]); + + api.addFiles([ + 'fileWorker.js' + ], 'server'); +}); + +// Package.on_test(function (api) { +// api.use('cfs:standard-packages@0.0.0'); + +// api.use('test-helpers', 'server'); +// api.use(['tinytest', 'underscore', 'ejson', 'ordered-dict', 'random']); + +// api.addFiles('tests/client-tests.js', 'client'); +// api.addFiles('tests/server-tests.js', 'server'); +// }); diff --git a/packages/worker/tests/client-tests.js b/packages/worker/tests/client-tests.js new file mode 100644 index 00000000000..49a1008467d --- /dev/null +++ b/packages/worker/tests/client-tests.js @@ -0,0 +1,44 @@ +function equals(a, b) { + return !!(EJSON.stringify(a) === EJSON.stringify(b)); +} + +Tinytest.add('cfs-worker - client - test environment', function(test) { + test.isTrue(typeof FS.Collection !== 'undefined', 'test environment not initialized FS.Collection'); + test.isTrue(typeof CFSErrorType !== 'undefined', 'test environment not initialized CFSErrorType'); +}); + +/* + * FS.File Client Tests + * + * construct FS.File with no arguments + * construct FS.File passing in File + * construct FS.File passing in Blob + * load blob into FS.File and then call FS.File.toDataUrl + * call FS.File.setDataFromBinary, then FS.File.getBlob(); make sure correct data is returned + * load blob into FS.File and then call FS.File.getBinary() with and without start/end; make sure correct data is returned + * construct FS.File, set FS.File.collectionName to a CFS name, and then test FS.File.update/remove/get/put/del/url + * set FS.File.name to a filename and test that FS.File.getExtension() returns the extension + * load blob into FS.File and make sure FS.File.saveLocal initiates a download (possibly can't do automatically) + * + */ + + +//Test API: +//test.isFalse(v, msg) +//test.isTrue(v, msg) +//test.equalactual, expected, message, not +//test.length(obj, len) +//test.include(s, v) +//test.isNaN(v, msg) +//test.isUndefined(v, msg) +//test.isNotNull +//test.isNull +//test.throws(func) +//test.instanceOf(obj, klass) +//test.notEqual(actual, expected, message) +//test.runId() +//test.exception(exception) +//test.expect_fail() +//test.ok(doc) +//test.fail(doc) +//test.equal(a, b, msg) diff --git a/packages/worker/tests/server-tests.js b/packages/worker/tests/server-tests.js new file mode 100644 index 00000000000..713ebc07c95 --- /dev/null +++ b/packages/worker/tests/server-tests.js @@ -0,0 +1,49 @@ +function equals(a, b) { + return !!(EJSON.stringify(a) === EJSON.stringify(b)); +} + +Tinytest.add('cfs-worker - server - test environment', function(test) { + test.isTrue(typeof FS.Collection !== 'undefined', 'test environment not initialized FS.Collection'); + test.isTrue(typeof CFSErrorType !== 'undefined', 'test environment not initialized CFSErrorType'); +}); + +/* + * FS.File Server Tests + * + * construct FS.File with no arguments + * load data with FS.File.setDataFromBuffer + * load data with FS.File.setDataFromBinary + * load data and then call FS.File.toDataUrl with and without callback + * load buffer into FS.File and then call FS.File.getBinary with and without start/end; make sure correct data is returned + * construct FS.File, set FS.File.collectionName to a CFS name, and then test FS.File.update/remove/get/put/del/url + * (call these with and without callback to test sync vs. async) + * set FS.File.name to a filename and test that FS.File.getExtension() returns the extension + * + * + * FS.Collection Server Tests + * + * Make sure options.filter is respected + * + * + */ + + +//Test API: +//test.isFalse(v, msg) +//test.isTrue(v, msg) +//test.equalactual, expected, message, not +//test.length(obj, len) +//test.include(s, v) +//test.isNaN(v, msg) +//test.isUndefined(v, msg) +//test.isNotNull +//test.isNull +//test.throws(func) +//test.instanceOf(obj, klass) +//test.notEqual(actual, expected, message) +//test.runId() +//test.exception(exception) +//test.expect_fail() +//test.ok(doc) +//test.fail(doc) +//test.equal(a, b, msg) From 86fdaa342bc6a8e5a87b5ef6fa88c872514ab1a8 Mon Sep 17 00:00:00 2001 From: lorenzo Date: Fri, 14 Apr 2017 19:14:46 +0300 Subject: [PATCH 2/2] linting --- packages/worker/fileWorker.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 7a150243d84..baa5eef8ef6 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -48,7 +48,7 @@ FS.FileWorker.observe = function(fsCollection) { FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); try { FS.TempStore.removeFile(fsFile); - }catch(err) { + } catch(err) { console.log(err); } } @@ -180,7 +180,7 @@ function saveCopy(fsFile, storeName, options) { // Pipe the temp data into the storage adapter readStream.pipe(writeStream); - }catch(err){ + } catch(err){ console.log(err); } }