Skip to content

Commit

Permalink
fix(datastore): keep locks on writes
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 22, 2016
1 parent a8ce3a6 commit a9c48e4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"bl": "^1.1.2",
"concat-stream": "^1.5.1",
"level-js": "^2.2.3",
"lock": "^0.1.2",
"lockfile": "^1.0.1",
"multihashes": "^0.2.1",
"xtend": "^4.0.1"
Expand All @@ -63,4 +64,4 @@
"dignifiedquire <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
15 changes: 14 additions & 1 deletion src/stores/datastore.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict'

const Lock = require('lock')
const stream = require('stream')

const PREFIX_LENGTH = 8

exports = module.exports
Expand All @@ -15,6 +18,7 @@ function multihashToPath (multihash, extension) {

exports.setUp = (basePath, blobStore, locks) => {
const store = blobStore(basePath + '/blocks')
const lock = new Lock()

return {
createReadStream: (multihash, extension) => {
Expand All @@ -29,8 +33,16 @@ exports.setUp = (basePath, blobStore, locks) => {
}

const path = multihashToPath(multihash, extension)
return store.createWriteStream(path, cb)
const through = stream.PassThrough()

lock(path, (release) => {
const ws = store.createWriteStream(path, release(cb))
through.pipe(ws)
})

return through
},

exists: (multihash, extension, cb) => {
if (typeof extension === 'function') {
cb = extension
Expand All @@ -40,6 +52,7 @@ exports.setUp = (basePath, blobStore, locks) => {
const path = multihashToPath(multihash, extension)
return store.exists(path, cb)
},

remove: (multihash, extension, cb) => {
if (typeof extension === 'function') {
cb = extension
Expand Down
24 changes: 24 additions & 0 deletions test/repo-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ module.exports = function (repo) {
}).end(data)
})

it('write locks', (done) => {
const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash'
const mh = new Buffer(base58.decode(rnd))
const data = new Buffer('Oh the data')

let i = 0
const finish = () => {
i++
if (i === 2) done()
}

repo.datastore.createWriteStream(mh, (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
finish()
}).end(data)

repo.datastore.createWriteStream(mh, (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
finish()
}).end(data)
})

it('block exists', function (done) {
const buf = new Buffer(base58.decode(baseFileHash))

Expand Down

0 comments on commit a9c48e4

Please sign in to comment.