Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

exporter: Support slicing streams stored in deeply nested DAGs #208

Merged
merged 1 commit into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"lodash": "^4.17.5",
"multihashes": "~0.4.13",
"multihashing-async": "~0.4.8",
"pull-async-values": "^1.0.3",
"pull-batch": "^1.0.0",
"pull-block": "^1.4.0",
"pull-cat": "^1.1.11",
Expand All @@ -75,6 +74,7 @@
"pull-pause": "0.0.2",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.2",
"pull-through": "^1.0.18",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR doesn't actually use pull-through - it appears to be a missing dependency as it's used by the builder and trickle-reducer modules.

"pull-traverse": "^1.0.3",
"pull-write": "^1.1.4",
"sparse-array": "^1.3.1"
Expand Down
145 changes: 116 additions & 29 deletions src/exporter/file.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
'use strict'

const traverse = require('pull-traverse')
const traverseSlice = require('./traverse-slice')
const UnixFS = require('ipfs-unixfs')
const CID = require('cids')
const pull = require('pull-stream')
const paramap = require('pull-paramap')

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
function getData (node) {
try {
const file = UnixFS.unmarshal(node.data)
return file.data || Buffer.alloc(0)
} catch (err) {
throw new Error('Failed to unmarshal node')
}
}

function visitor (node) {
return pull(
pull.values(node.links),
paramap((link, cb) => dag.get(new CID(link.multihash), cb)),
pull.map((result) => result.value)
)
}

const accepts = pathRest[0]

if (accepts !== undefined && accepts !== path) {
Expand All @@ -34,17 +16,7 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,

const file = UnixFS.unmarshal(node.data)
const fileSize = size || file.fileSize()

let content

if (!isNaN(begin)) {
content = traverseSlice(node, dag, begin, end)
} else {
content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)
}
const content = streamBytes(dag, node, fileSize, findByteRange(fileSize, begin, end))

return pull.values([{
depth: depth,
Expand All @@ -56,3 +28,118 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,
type: 'file'
}])
}

function findByteRange (fileSize, begin, end) {
if (!begin) {
begin = 0
}

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

return {
begin, end
}
}

function streamBytes (dag, node, fileSize, { begin, end }) {
if (begin === end) {
return pull.empty()
}

let streamPosition = 0

function getData ({ node, start }) {
if (!node || !node.data) {
return
}

try {
const file = UnixFS.unmarshal(node.data)

if (!file.data) {
return
}

const block = extractDataFromBlock(file.data, start, begin, end)

return block
} catch (err) {
throw new Error('Failed to unmarshal node')
}
}

function visitor ({ node }) {
const file = UnixFS.unmarshal(node.data)

// work out which child nodes contain the requested data
const filteredLinks = node.links
.map((link, index) => {
const child = {
link: link,
start: streamPosition,
end: streamPosition + file.blockSizes[index]
}

streamPosition = child.end

return child
})
.filter((child, index) => {
return (begin >= child.start && begin < child.end) || // child has begin byte
(end > child.start && end <= child.end) || // child has end byte
(begin < child.start && end > child.end) // child is between begin and end bytes
})

if (filteredLinks.length) {
// move stream position to the first node we're going to return data from
streamPosition = filteredLinks[0].start
}

return pull(
pull.values(filteredLinks),
paramap((child, cb) => {
dag.get(new CID(child.link.multihash), (error, result) => cb(error, {
start: child.start,
end: child.end,
node: result && result.value
}))
})
)
}

return pull(
traverse.depthFirst({
node,
start: 0,
end: fileSize
}, visitor),
pull.map(getData),
pull.filter(Boolean)
)
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
104 changes: 0 additions & 104 deletions src/exporter/traverse-slice.js

This file was deleted.

Loading