This repository has been archived by the owner on Feb 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
840 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,7 @@ | |
"bs58": "^3.0.0", | ||
"debug": "^2.2.0", | ||
"detect-node": "^2.0.3", | ||
"fnv": "^0.1.3", | ||
"fs-blob-store": "^5.2.1", | ||
"glob": "^7.0.3", | ||
"hapi": "^13.4.1", | ||
|
@@ -111,4 +112,4 @@ | |
"kumavis <[email protected]>", | ||
"nginnever <[email protected]>" | ||
] | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,270 @@ | ||
'use strict' | ||
|
||
const bs58 = require('bs58') | ||
const protobuf = require('protocol-buffers') | ||
const crypto = require('crypto') | ||
const fnv = require('fnv') | ||
const mDAG = require('ipfs-merkle-dag') | ||
const DAGNode = mDAG.DAGNode | ||
const DAGLink = mDAG.DAGLink | ||
const varint = require('varint') | ||
|
||
const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' | ||
const emptyKey = new Buffer(bs58.decode(emptyKeyHash)) | ||
const defaultFanout = 256 | ||
const maxItems = 8192 | ||
|
||
// Protobuf interface | ||
const pbSchema = ( | ||
// from go-ipfs/pin/internal/pb/header.proto | ||
'message Set { ' + | ||
// 1 for now | ||
'optional uint32 version = 1; ' + | ||
// how many of the links are subtrees | ||
'optional uint32 fanout = 2; ' + | ||
// hash seed for subtree selection, a random number | ||
'optional fixed32 seed = 3; ' + | ||
'}' | ||
) | ||
const pb = protobuf(pbSchema) | ||
function readHeader (rootNode) { | ||
// rootNode.data should be a buffer of the format: | ||
// < varint(headerLength) | header | itemData... > | ||
var rootData = rootNode.data | ||
var hdrLength = varint.decode(rootData) | ||
var vBytes = varint.decode.bytes | ||
if (vBytes <= 0) { | ||
return { err: 'Invalid Set header length' } | ||
} | ||
if (vBytes + hdrLength > rootData.length) { | ||
return { err: 'Impossibly large set header length' } | ||
} | ||
var hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) | ||
var header = pb.Set.decode(hdrSlice) | ||
if (header.version !== 1) { | ||
return { err: 'Unsupported Set version: ' + header.version } | ||
} | ||
if (header.fanout > rootNode.links.length) { | ||
return { err: 'Impossibly large fanout' } | ||
} | ||
return { | ||
header: header, | ||
data: rootData.slice(hdrLength + vBytes) | ||
} | ||
} | ||
|
||
exports = module.exports = function (dagS) { | ||
var pinnerUtils = { | ||
// should this be part of `object` rather than `pinner`? | ||
hasChild: (root, childhash, callback, _links, _checked, _seen) => { | ||
// callback (err, has) | ||
if (callback.fired) { return } | ||
if (typeof childhash === 'object') { | ||
childhash = bs58.encode(childhash).toString() | ||
} | ||
_links = _links || root.links.length | ||
_checked = _checked || 0 | ||
_seen = _seen || {} | ||
|
||
if (!root.links.length && _links === _checked) { | ||
// all nodes have been checked | ||
return callback(null, false) | ||
} | ||
root.links.forEach((link) => { | ||
var bs58link = bs58.encode(link.hash).toString() | ||
if (bs58link === childhash) { | ||
callback.fired = true | ||
return callback(null, true) | ||
} | ||
dagS.get(link.hash, (err, obj) => { | ||
if (err) { | ||
callback.fired = true | ||
return callback(err) | ||
} | ||
// don't check the same links twice | ||
if (bs58link in _seen) { return } | ||
_seen[bs58link] = true | ||
|
||
_checked++ | ||
_links += obj.links.length | ||
pinnerUtils.hasChild(obj, childhash, callback, _links, _checked, _seen) | ||
}) | ||
}) | ||
}, | ||
|
||
storeSet: (keys, logInternalKey, callback) => { | ||
// callback (err, rootNode) | ||
var items = keys.map((key) => { | ||
return { | ||
key: key, | ||
data: null | ||
} | ||
}) | ||
pinnerUtils.storeItems(items, logInternalKey, (err, rootNode) => { | ||
if (err) { return callback(err) } | ||
dagS.add(rootNode, (err) => { | ||
if (err) { return callback(err) } | ||
logInternalKey(rootNode.multihash()) | ||
callback(null, rootNode) | ||
}) | ||
}) | ||
}, | ||
|
||
storeItems: (items, logInternalKey, callback, _subcalls, _done) => { | ||
// callback (err, rootNode) | ||
var seed = crypto.randomBytes(4).readUInt32LE(0, true) | ||
var pbHeader = pb.Set.encode({ | ||
version: 1, | ||
fanout: defaultFanout, | ||
seed: seed | ||
}) | ||
var rootData = Buffer.concat([ | ||
new Buffer(varint.encode(pbHeader.length)), pbHeader | ||
]) | ||
var rootLinks = [] | ||
var i | ||
for (i = 0; i < defaultFanout; i++) { | ||
rootLinks.push(new DAGLink('', null, emptyKey)) | ||
} | ||
logInternalKey(emptyKey) | ||
|
||
if (items.length <= maxItems) { | ||
// the items will fit in a single root node | ||
var itemLinks = [] | ||
var itemData = [] | ||
var indices = [] | ||
for (i = 0; i < items.length; i++) { | ||
itemLinks.push(new DAGLink('', null, items[i].key)) | ||
itemData.push(items[i].data || new Buffer([])) | ||
indices.push(i) | ||
} | ||
indices.sort((a, b) => { | ||
var x = Buffer.compare(itemLinks[a].hash, itemLinks[b].hash) | ||
if (x) { return x } | ||
return (a < b ? -1 : 1) | ||
}) | ||
var sortedLinks = indices.map((i) => { return itemLinks[i] }) | ||
var sortedData = indices.map((i) => { return itemData[i] }) | ||
rootLinks = rootLinks.concat(sortedLinks) | ||
rootData = Buffer.concat([rootData].concat(sortedData)) | ||
readHeader(new DAGNode(rootData, rootLinks)) // | ||
return callback(null, new DAGNode(rootData, rootLinks)) | ||
} else { | ||
// need to split up the items into multiple root nodes | ||
// (using go-ipfs "wasteful but simple" approach for consistency) | ||
_subcalls = _subcalls || 0 | ||
_done = _done || 0 | ||
var h | ||
var hashed = {} | ||
var hashFn = (seed, key) => { | ||
var buf = new Buffer(4) | ||
var h = new fnv.FNV() | ||
buf.writeUInt32LE(seed, 0) | ||
h.update(buf) | ||
h.update(bs58.encode(key).toString()) | ||
return h.value() | ||
} | ||
// items will be distributed among `defaultFanout` bins | ||
for (i = 0; i < items.length; i++) { | ||
h = hashFn(seed, items[i].key) % defaultFanout | ||
hashed[h] = hashed[h] || [] | ||
hashed[h].push(items[i]) | ||
} | ||
var storeItemsCb = (err, child) => { | ||
if (callback.fired) { return } | ||
if (err) { | ||
callback.fired = true | ||
return callback(err) | ||
} | ||
dagS.add(child, (err) => { | ||
if (callback.fired) { return } | ||
if (err) { | ||
callback.fired = true | ||
return callback(err) | ||
} | ||
logInternalKey(child.multihash()) | ||
rootLinks[this.h] = new DAGLink( | ||
'', child.size(), child.multihash() | ||
) | ||
_done++ | ||
if (_done === _subcalls) { | ||
// all finished | ||
return callback(null, new DAGNode(rootData, rootLinks)) | ||
} | ||
}) | ||
} | ||
_subcalls += Object.keys(hashed).length | ||
for (h in hashed) { | ||
if (hashed.hasOwnProperty(h)) { | ||
pinnerUtils.storeItems( | ||
hashed[h], | ||
logInternalKey, | ||
storeItemsCb.bind({h: h}), | ||
_subcalls, | ||
_done | ||
) | ||
} | ||
} | ||
} | ||
}, | ||
|
||
loadSet: (rootNode, name, logInternalKey, callback) => { | ||
// callback (err, keys) | ||
var link = rootNode.links.filter((link) => { | ||
return link.name === name | ||
}).pop() | ||
if (!link) { return callback('No link found with name ' + name) } | ||
logInternalKey(link.hash) | ||
dagS.get(link.hash, (err, obj) => { | ||
if (err) { return callback(err) } | ||
var keys = [] | ||
var walkerFn = (link) => { | ||
keys.push(link.hash) | ||
} | ||
pinnerUtils.walkItems(obj, walkerFn, logInternalKey, (err) => { | ||
if (err) { return callback(err) } | ||
return callback(null, keys) | ||
}) | ||
}) | ||
}, | ||
|
||
walkItems: (node, walkerFn, logInternalKey, callback) => { | ||
// callback (err) | ||
var h = readHeader(node) | ||
if (h.err) { return callback(h.err) } | ||
var fanout = h.header.fanout | ||
var subwalks = 0 | ||
var finished = 0 | ||
|
||
var walkCb = (err) => { | ||
if (err) { return callback(err) } | ||
finished++ | ||
if (subwalks === finished) { | ||
return callback() | ||
} | ||
} | ||
|
||
for (var i = 0; i < node.links.length; i++) { | ||
var link = node.links[i] | ||
if (i >= fanout) { | ||
// item link | ||
walkerFn(link, i, h.data) | ||
} else { | ||
// fanout link | ||
logInternalKey(link.hash) | ||
if (!emptyKey.equals(link.hash)) { | ||
subwalks++ | ||
dagS.get(link.hash, (err, obj) => { | ||
if (err) { return callback(err) } | ||
pinnerUtils.walkItems(obj, walkerFn, logInternalKey, walkCb) | ||
}) | ||
} | ||
} | ||
} | ||
if (!subwalks) { | ||
return callback() | ||
} | ||
} | ||
} | ||
return pinnerUtils | ||
} |
Oops, something went wrong.