Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
perf: write files to repo outside of write lock
Browse files Browse the repository at this point in the history
perf: read files from repo outside of read lock

fix: standardise error messages with go

License: MIT
Signed-off-by: achingbrain <[email protected]>
  • Loading branch information
achingbrain committed Aug 8, 2018
1 parent 158bb28 commit 63940b4
Show file tree
Hide file tree
Showing 22 changed files with 305 additions and 205 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"detect-node": "^2.0.3",
"detect-webworker": "^1.0.0",
"dirty-chai": "^2.0.1",
"ipfs": "~0.30.0",
"ipfs": "~0.31.2",
"pull-buffer-stream": "^1.0.0",
"safe-buffer": "^5.1.1",
"tmp": "~0.0.33"
Expand All @@ -56,7 +56,7 @@
"filereader-stream": "^2.0.0",
"interface-datastore": "~0.4.2",
"ipfs-unixfs": "~0.1.15",
"ipfs-unixfs-engine": "~0.31.3",
"ipfs-unixfs-engine": "~0.32.1",
"is-pull-stream": "~0.0.0",
"is-stream": "^1.1.0",
"joi": "^13.4.0",
Expand All @@ -65,6 +65,7 @@
"once": "^1.4.0",
"promisify-es6": "^1.0.3",
"pull-cat": "^1.1.11",
"pull-defer": "~0.2.2",
"pull-paramap": "^1.2.2",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.8",
Expand Down
9 changes: 9 additions & 0 deletions src/cli/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ module.exports = {
coerce: asBoolean,
describe: 'Use long listing format.'
},
unsorted: {
alias: 'U',
type: 'boolean',
default: false,
coerce: asBoolean,
describe: 'Do not sort; list entries in directory order.'
},
cidBase: {
alias: 'cid-base',
default: 'base58btc',
Expand All @@ -33,12 +40,14 @@ module.exports = {
path,
ipfs,
long,
unsorted,
cidBase
} = argv

argv.resolve(
ipfs.files.ls(path || FILE_SEPARATOR, {
long,
unsorted,
cidBase
})
.then(files => {
Expand Down
44 changes: 33 additions & 11 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,38 @@ const {
createLock
} = require('./utils')

// These operations are read-locked at the function level and will execute simultaneously
const readOperations = {
ls: require('./ls'),
read: require('./read'),
readPullStream: require('./read-pull-stream'),
readReadableStream: require('./read-readable-stream'),
stat: require('./stat')
}

// These operations are locked at the function level and will execute in series
const writeOperations = {
cp: require('./cp'),
flush: require('./flush'),
mkdir: require('./mkdir'),
mv: require('./mv'),
rm: require('./rm'),
write: require('./write')
rm: require('./rm')
}

// These operations are asynchronous and manage their own locking
const upwrappedOperations = {
write: require('./write'),
read: require('./read')
}

// These operations are synchronous and manage their own locking
const upwrappedSynchronousOperations = {
readPullStream: require('./read-pull-stream'),
readReadableStream: require('./read-readable-stream')
}

const wrap = (ipfs, mfs, operations, lock) => {
const wrap = ({
ipfs, mfs, operations, lock
}) => {
Object.keys(operations).forEach(key => {
if (operations.hasOwnProperty(key)) {
mfs[key] = promisify(lock(operations[key](ipfs)))
}
mfs[key] = promisify(lock(operations[key](ipfs)))
})
}

Expand All @@ -51,8 +61,20 @@ module.exports = (ipfs, options) => {

const mfs = {}

wrap(ipfs, mfs, readOperations, readLock)
wrap(ipfs, mfs, writeOperations, writeLock)
wrap({
ipfs, mfs, operations: readOperations, lock: readLock
})
wrap({
ipfs, mfs, operations: writeOperations, lock: writeLock
})

Object.keys(upwrappedOperations).forEach(key => {
mfs[key] = promisify(upwrappedOperations[key](ipfs))
})

Object.keys(upwrappedSynchronousOperations).forEach(key => {
mfs[key] = upwrappedSynchronousOperations[key](ipfs)
})

return mfs
}
27 changes: 14 additions & 13 deletions src/core/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const {

const defaultOptions = {
long: false,
cidBase: 'base58btc'
cidBase: 'base58btc',
unsorted: false
}

module.exports = (ipfs) => {
Expand Down Expand Up @@ -64,6 +65,17 @@ module.exports = (ipfs) => {
}
},

// https://github.com/ipfs/go-ipfs/issues/5181
(files, cb) => {
if (options.unsorted) {
return cb(null, files)
}

return cb(null, files.sort((a, b) => {
return b.name.localeCompare(a.name)
}))
},

// https://github.com/ipfs/go-ipfs/issues/5026
(files, cb) => cb(null, files.map(file => {
if (FILE_TYPES.hasOwnProperty(file.type)) {
Expand All @@ -77,18 +89,7 @@ module.exports = (ipfs) => {
}

return file
})),

// https://github.com/ipfs/go-ipfs/issues/5181
(files, cb) => {
if (options.long) {
return cb(null, files.sort((a, b) => {
return b.name.localeCompare(a.name)
}))
}

cb(null, files)
}
}))
], callback)
}
}
4 changes: 2 additions & 2 deletions src/core/mkdir.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ module.exports = (ipfs) => {
return cb(new Error('file already exists'))
}

if (error.message.includes('did not exist')) {
log(`${path} did not exist`)
if (error.message.includes('does not exist')) {
log(`${path} does not exist`)
return cb()
}

Expand Down
65 changes: 39 additions & 26 deletions src/core/read-pull-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

const exporter = require('ipfs-unixfs-engine').exporter
const pull = require('pull-stream/pull')
const once = require('pull-stream/sources/once')
const asyncMap = require('pull-stream/throughs/async-map')
const defer = require('pull-defer')
const collect = require('pull-stream/sinks/collect')
const waterfall = require('async/waterfall')
const UnixFs = require('ipfs-unixfs')
const {
traverseTo
traverseTo,
createLock
} = require('./utils')
const log = require('debug')('ipfs:mfs:read-pull-stream')

Expand All @@ -16,39 +19,49 @@ const defaultOptions = {
}

module.exports = (ipfs) => {
return function mfsReadPullStream (path, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

return function mfsReadPullStream (path, options = {}) {
options = Object.assign({}, defaultOptions, options)

log(`Reading ${path}`)

waterfall([
(done) => traverseTo(ipfs, path, {
parents: false
}, done),
(result, done) => {
const deferred = defer.source()

pull(
once(path),
asyncMap((path, cb) => {
createLock().readLock((next) => {
traverseTo(ipfs, path, {
parents: false
}, next)
})(cb)
}),
asyncMap((result, cb) => {
const node = result.node
const meta = UnixFs.unmarshal(node.data)

if (meta.type !== 'file') {
return done(new Error(`${path} was not a file`))
return cb(new Error(`${path} was not a file`))
}

waterfall([
(next) => pull(
exporter(node.multihash, ipfs.dag, {
offset: options.offset,
length: options.length
}),
collect(next)
),
(files, next) => next(null, files[0].content)
], done)
}
], callback)
pull(
exporter(node.multihash, ipfs.dag, {
offset: options.offset,
length: options.length
}),
collect((error, files) => {
cb(error, error ? null : files[0].content)
})
)
}),
collect((error, streams) => {
if (error) {
return deferred.abort(error)
}

deferred.resolve(streams[0])
})
)

return deferred
}
}
13 changes: 2 additions & 11 deletions src/core/read-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
'use strict'

const waterfall = require('async/waterfall')
const readPullStream = require('./read-pull-stream')
const toStream = require('pull-stream-to-stream')

module.exports = (ipfs) => {
return function mfsReadReadableStream (path, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

waterfall([
(cb) => readPullStream(ipfs)(path, options, cb),
(stream, cb) => cb(null, toStream.source(stream))
], callback)
return function mfsReadReadableStream (path, options = {}) {
return toStream.source(readPullStream(ipfs)(path, options))
}
}
21 changes: 10 additions & 11 deletions src/core/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const pull = require('pull-stream/pull')
const collect = require('pull-stream/sinks/collect')
const waterfall = require('async/waterfall')
const readPullStream = require('./read-pull-stream')

module.exports = (ipfs) => {
Expand All @@ -12,15 +11,15 @@ module.exports = (ipfs) => {
options = {}
}

waterfall([
(cb) => readPullStream(ipfs)(path, options, cb),
(stream, cb) => pull(
stream,
collect(cb)
),
(buffers, cb) => {
cb(null, Buffer.concat(buffers))
}
], callback)
pull(
readPullStream(ipfs)(path, options),
collect((error, buffers) => {
if (error) {
return callback(error)
}

return callback(null, Buffer.concat(buffers))
})
)
}
}
15 changes: 12 additions & 3 deletions src/core/utils/create-lock.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
const mortice = require('mortice')
const log = require('debug')('ipfs:mfs:lock')

let lock

module.exports = (repoOwner) => {
if (lock) {
return lock
}

const mutex = mortice({
// ordinarily the main thread would store the read/write lock but
// if we are the thread that owns the repo, we can store the lock
Expand All @@ -17,7 +23,8 @@ module.exports = (repoOwner) => {
mutex[`${type}Lock`](() => {
return new Promise((resolve, reject) => {
args.push((error, result) => {
log(`${type} operation callback invoked${error ? ' with error' : ''}`)
log(`${type} operation callback invoked${error ? ' with error: ' + error.message : ''}`)

if (error) {
return reject(error)
}
Expand All @@ -35,7 +42,7 @@ module.exports = (repoOwner) => {
cb(null, result)
})
.catch((error) => {
log(`Finished ${type} operation with error`)
log(`Finished ${type} operation with error: ${error.message}`)
if (callback) {
return callback(error)
}
Expand All @@ -46,7 +53,7 @@ module.exports = (repoOwner) => {
})
}

return {
lock = {
readLock: (func) => {
return function () {
const args = Array.from(arguments)
Expand All @@ -65,4 +72,6 @@ module.exports = (repoOwner) => {
}
}
}

return lock
}
1 change: 1 addition & 0 deletions src/core/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
formatCid: require('./format-cid'),
limitStreamBytes: require('./limit-stream-bytes'),
loadNode: require('./load-node'),
toPullSource: require('./to-pull-source'),
toSourcesAndDestination: require('./to-sources-and-destination'),
toSources: require('./to-sources'),
traverseTo: require('./traverse-to'),
Expand Down
Loading

0 comments on commit 63940b4

Please sign in to comment.