Skip to content

Commit

Permalink
Merge pull request #76 from ipfs/pull
Browse files Browse the repository at this point in the history
[WIP] Move to pull-streams
  • Loading branch information
daviddias authored Sep 8, 2016
2 parents 6a92978 + 08e68b3 commit 0b31f17
Show file tree
Hide file tree
Showing 46 changed files with 602 additions and 509 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: node_js
node_js:
- 4
- 5
- stable

# Make sure we have new NPM.
before_install:
Expand Down
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ This is the implementation of the [IPFS repo spec](https://github.com/ipfs/specs
- [repo.config.get(cb(err, config))](#repoconfiggetcberr-config)
- [repo.config.set(config, cb(err))](#repoconfigsetconfig-cberr)
- [repo.keys](#repokeys)
- [repo.datastore.read(key, cb(err, buffer))](#repodatastorereadkey-cberr-buffer)
- [repo.datastore.write(buffer, cb(err, buffer))](#repodatastorewritebuffer-cberr-buffer)
- [repo.datastoreLegacy](#repodatastorelegacy)
- [repo.blockstore.putStream()](#)
- [repo.blockstore.getStream(key, extension)](#)
- [repo.datastore](#repodatastore)
- [Contribute](#contribute)
- [License](#license)

Expand All @@ -45,7 +45,7 @@ Here is the architectural reasoning for this repo:
│ interface defined by Repo Spec │
├─────────────────────────────────┤
│ │ ┌──────────────────────┐
│ │ │ abstract-blob-store │
│ │ │ interface-pull-blob-store │
│ IPFS REPO │─────────────────────────────────▶│ interface │
│ │ ├──────────────────────┤
│ │ │ locks │
Expand All @@ -60,15 +60,15 @@ Here is the architectural reasoning for this repo:
│ interface │ │ interface │ │ interface │ │ interface │ │ interface │ │ interface │
├───────────┤ ├───────────┤ ├───────────┤ ├───────────┤ ├───────────┤ ├───────────┤
│ │ │ │ │ │ │ │ │ │ │ │
│ keys │ │ config │ │ datastore │ │ datastore │ │ logs │ │ version │
│ │ │ │ │ │ │ -legacy │ │ │ │ │
│ keys │ │ config │ │ blockstore │ │ datastore │ │ logs │ │ version │
│ │ │ │ │ │ │ │ │ │ │ │
└───────────┘ └───────────┘ └───────────┘ └───────────┘ └───────────┘ └───────────┘
```

This provides a well defined interface for creating and interacting with an IPFS
Repo backed by a group of abstract backends for keys, configuration, logs, and
more. Each of the individual repos has an interface defined by
[abstract-blob-store](https://github.com/maxogden/abstract-blob-store): this
[interface-pull-blob-store](https://github.com/ipfs/interface-pull-blob-store): this
enables us to make IPFS Repo portable (running on Node.js vs the browser) and
accept different types of storage mechanisms for each repo (fs, levelDB, etc).

Expand Down Expand Up @@ -136,7 +136,7 @@ Creates a **reference** to an IPFS repository at the path `path`. This does
Valid keys for `opts` include:

- `stores`: either an
[abstract-blob-store](https://github.com/maxogden/abstract-blob-store), or a
[interface-pull-blob-store](https://github.com/ipfs/interface-pull-blob-store), or a
map of the form

```js
Expand Down Expand Up @@ -173,12 +173,14 @@ Read/write keys inside the repo. This feature will be expanded once
[IPRS](https://github.com/ipfs/specs/tree/master/records) and
[KeyChain](https://github.com/ipfs/specs/tree/master/keychain) are finalized and implemented on go-ipfs.

### repo.datastore.read(key, cb(err, buffer))
### repo.datastore.write(buffer, cb(err, buffer))
### repo.blockstore.putStream()
### repo.datastore.getStream(key, extension)
### repo.datastore.has(key, extension, cb)
### repo.datastore.delete(key, extension, cb)

Read and write buffers to/from the repo's block store.

### repo.datastoreLegacy
### repo.datastore

**WIP**

Expand Down
35 changes: 19 additions & 16 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,30 @@
],
"homepage": "https://github.com/ipfs/js-ipfs-repo",
"devDependencies": {
"abstract-blob-store": "^3.2.0",
"aegir": "^5.0.1",
"async": "^2.0.1",
"aegir": "^8.0.0",
"buffer-loader": "^0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"lodash": "^4.13.1",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"interface-pull-blob-store": "^0.5.0",
"lodash": "^4.15.0",
"multihashes": "^0.2.2",
"ncp": "^2.0.0",
"pre-commit": "^1.1.2",
"rimraf": "^2.5.2"
"pre-commit": "^1.1.3",
"rimraf": "^2.5.4"
},
"dependencies": {
"babel-runtime": "^6.6.1",
"bl": "^1.1.2",
"concat-stream": "^1.5.1",
"babel-runtime": "^6.11.6",
"base32.js": "^0.1.0",
"ipfs-block": "^0.3.0",
"lock": "^0.1.2",
"lockfile": "^1.0.1",
"multihashes": "^0.2.1",
"xtend": "^4.0.1"
"lock": "^0.1.3",
"multihashes": "^0.2.2",
"pull-stream": "^3.4.5",
"pull-through": "^1.0.18",
"pull-write": "^1.1.0",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4",
"safe-buffer": "^5.0.1"
},
"license": "MIT",
"contributors": [
Expand All @@ -61,4 +64,4 @@
"Stephen Whitmore <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
81 changes: 42 additions & 39 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,56 @@
'use strict'

const assert = require('assert')

const stores = require('./stores')

function Repo (repoPath, options) {
if (!(this instanceof Repo)) {
return new Repo(repoPath, options)
module.exports = class Repo {
constructor (repoPath, options) {
assert.equal(typeof repoPath, 'string', 'missing repoPath')
assert(options, 'missing options')
assert(options.stores, 'missing options.stores')

this.path = repoPath

const blobStores = initializeBlobStores(options.stores)

const setup = (name, needs) => {
needs = needs || {}
const args = [repoPath, blobStores[name]]
if (needs.locks) {
args.push(this.locks)
}

if (needs.config) {
args.push(this.config)
}

return stores[name].setUp.apply(stores[name], args)
}

this.locks = setup('locks')
this.version = setup('version', {locks: true})
this.config = setup('config', {locks: true})
this.keys = setup('keys', {locks: true, config: true})
this.blockstore = setup('blockstore', {locks: true})
}

exists (callback) {
this.version.exists(callback)
}
if (!options) { throw new Error('missing options param') }
if (!options.stores) { throw new Error('missing options.stores param') }

// If options.stores is an abstract-blob-store instead of a map, use it for
// all stores.
if (options.stores.prototype && options.stores.prototype.createWriteStream) {
const store = options.stores
options.stores = {
}

function initializeBlobStores (store) {
if (store.constructor) {
return {
keys: store,
config: store,
datastore: store,
blockstore: store,
logs: store,
locks: store,
version: store
}
}

this.path = repoPath

this.locks = stores
.locks
.setUp(repoPath, options.stores.locks)

this.exists = (callback) => {
this.version.exists(callback)
}

this.version = stores
.version
.setUp(repoPath, options.stores.version, this.locks)

this.config = stores
.config
.setUp(repoPath, options.stores.config, this.locks)

this.keys = stores
.keys
.setUp(repoPath, options.stores.keys, this.locks, this.config)

this.datastore = stores
.datastore
.setUp(repoPath, options.stores.datastore, this.locks)
return store
}

exports = module.exports = Repo
137 changes: 137 additions & 0 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
'use strict'

const Block = require('ipfs-block')
const pull = require('pull-stream')
const Lock = require('lock')
const base32 = require('base32.js')
const path = require('path')
const write = require('pull-write')
const parallel = require('run-parallel')
const through = require('pull-through')

const PREFIX_LENGTH = 5

exports = module.exports

function multihashToPath (multihash, extension) {
extension = extension || 'data'
const encoder = new base32.Encoder()
const hash = encoder.write(multihash).finalize()
const filename = `${hash}.${extension}`
const folder = filename.slice(0, PREFIX_LENGTH)

return path.join(folder, filename)
}

exports.setUp = (basePath, BlobStore, locks) => {
const store = new BlobStore(basePath + '/blocks')
const lock = new Lock()

function writeBlock (block, cb) {
if (!block || !block.data) {
return cb(new Error('Invalid block'))
}

const key = multihashToPath(block.key, block.extension)

lock(key, (release) => pull(
pull.values([block.data]),
store.write(key, release((err) => {
if (err) {
return cb(err)
}
cb(null, {key})
}))
))
}

return {
getStream (key, extension) {
if (!key) {
return pull.error(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)

const ext = extension === 'data' ? 'protobuf' : extension
let data = []

return pull(
store.read(p),
through(function (values) {
data = data.concat(values)
}, function () {
this.queue(new Block(Buffer.concat(data), ext))
this.queue(null)
})
)
},

putStream () {
let ended = false
let written = []
let push = null

const sink = write((blocks, cb) => {
parallel(blocks.map((block) => (cb) => {
writeBlock(block, (err, meta) => {
if (err) return cb(err)
if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}), cb)
}, null, 100, (err) => {
ended = err || true
if (push) push(ended)
})

const source = (end, cb) => {
if (end) ended = end
if (ended) return cb(ended)

if (written.length) {
return cb(null, written.shift())
}

push = cb
}

return {source, sink}
},

has (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

if (!key) {
return cb(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.exists(p, cb)
},

delete (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

if (!key) {
return cb(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.remove(p, cb)
}
}
}
Loading

0 comments on commit 0b31f17

Please sign in to comment.