This repository has been archived by the owner on Jun 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathdagbuilder.go
408 lines (350 loc) · 12.4 KB
/
dagbuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
package helpers
import (
"context"
"errors"
"io"
"os"
dag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
pb "github.com/ipfs/go-unixfs/pb"
cid "github.com/ipfs/go-cid"
chunker "github.com/ipfs/go-ipfs-chunker"
"github.com/ipfs/go-ipfs-files"
pi "github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
)
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/importer/helpers.ErrMissingFsRef
var ErrMissingFsRef = errors.New("missing file path or URL, can't create filestore reference")
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/importer/helpers.DagBuilderHelper
type DagBuilderHelper struct {
dserv ipld.DAGService
spl chunker.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
cidBuilder cid.Builder
// Filestore support variables.
// ----------------------------
// TODO: Encapsulate in `FilestoreNode` (which is basically what they are).
//
// Besides having the path this variable (if set) is used as a flag
// to indicate that Filestore should be used.
fullPath string
stat os.FileInfo
// Keeps track of the current file size added to the DAG (used in
// the balanced builder). It is assumed that the `DagBuilderHelper`
// is not reused to construct another DAG, but a new one (with a
// zero `offset`) is created.
offset uint64
}
// DagBuilderParams wraps configuration options to create a DagBuilderHelper
// from a chunker.Splitter.
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/importer/helpers.DagBuilderParams
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// RawLeaves signifies that the importer should use raw ipld nodes as leaves
// instead of using the unixfs TRaw type
RawLeaves bool
// CID Builder to use if set
CidBuilder cid.Builder
// DAGService to write blocks to (required)
Dagserv ipld.DAGService
// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
NoCopy bool
}
// New generates a new DagBuilderHelper from the given params and a given
// chunker.Splitter as data source.
func (dbp *DagBuilderParams) New(spl chunker.Splitter) (*DagBuilderHelper, error) {
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
cidBuilder: dbp.CidBuilder,
maxlinks: dbp.Maxlinks,
}
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
db.stat = fi.Stat()
}
if dbp.NoCopy && db.fullPath == "" { // Enforce NoCopy
return nil, ErrMissingFsRef
}
return db, nil
}
// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil || db.recvdErr != nil {
return
}
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}
// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish.
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
if db.recvdErr != nil {
return nil, db.recvdErr
}
return d, nil
}
// GetDagServ returns the dagservice object this Helper is using
func (db *DagBuilderHelper) GetDagServ() ipld.DAGService {
return db.dserv
}
// GetCidBuilder returns the internal `cid.CidBuilder` set in the builder.
func (db *DagBuilderHelper) GetCidBuilder() cid.Builder {
return db.cidBuilder
}
// NewLeafNode creates a leaf node filled with data. If rawLeaves is
// defined then a raw leaf will be returned. Otherwise, it will create
// and return `FSNodeOverDag` with `fsNodeType`.
func (db *DagBuilderHelper) NewLeafNode(data []byte, fsNodeType pb.Data_DataType) (ipld.Node, error) {
if len(data) > BlockSizeLimit {
return nil, ErrSizeLimitExceeded
}
if db.rawLeaves {
// Encapsulate the data in a raw node.
if db.cidBuilder == nil {
return dag.NewRawNode(data), nil
}
rawnode, err := dag.NewRawNodeWPrefix(data, db.cidBuilder)
if err != nil {
return nil, err
}
return rawnode, nil
}
// Encapsulate the data in UnixFS node (instead of a raw node).
fsNodeOverDag := db.NewFSNodeOverDag(fsNodeType)
fsNodeOverDag.SetFileData(data)
node, err := fsNodeOverDag.Commit()
if err != nil {
return nil, err
}
// TODO: Encapsulate this sequence of calls into a function that
// just returns the final `ipld.Node` avoiding going through
// `FSNodeOverDag`.
return node, nil
}
// FillNodeLayer will add datanodes as children to the give node until
// it is full in this layer or no more data.
// NOTE: This function creates raw data nodes so it only works
// for the `trickle.Layout`.
func (db *DagBuilderHelper) FillNodeLayer(node *FSNodeOverDag) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child, childFileSize, err := db.NewLeafDataNode(ft.TRaw)
if err != nil {
return err
}
if err := node.AddChild(child, childFileSize, db); err != nil {
return err
}
}
// TODO: Do we need to commit here? The caller who created the
// `FSNodeOverDag` should be in charge of that.
_, err := node.Commit()
return err
}
// NewLeafDataNode builds the `node` with the data obtained from the
// Splitter with the given constraints (BlockSizeLimit, RawLeaves)
// specified when creating the DagBuilderHelper. It returns
// `ipld.Node` with the `dataSize` (that will be used to keep track of
// the DAG file size). The size of the data is computed here because
// after that it will be hidden by `NewLeafNode` inside a generic
// `ipld.Node` representation.
func (db *DagBuilderHelper) NewLeafDataNode(fsNodeType pb.Data_DataType) (node ipld.Node, dataSize uint64, err error) {
fileData, err := db.Next()
if err != nil {
return nil, 0, err
}
dataSize = uint64(len(fileData))
// Create a new leaf node containing the file chunk data.
node, err = db.NewLeafNode(fileData, fsNodeType)
if err != nil {
return nil, 0, err
}
// Convert this leaf to a `FilestoreNode` if needed.
node = db.ProcessFileStore(node, dataSize)
return node, dataSize, nil
}
// ProcessFileStore generates, if Filestore is being used, the
// `FilestoreNode` representation of the `ipld.Node` that
// contains the file data. If Filestore is not being used just
// return the same node to continue with its addition to the DAG.
//
// The `db.offset` is updated at this point (instead of when
// `NewLeafDataNode` is called, both work in tandem but the
// offset is more related to this function).
func (db *DagBuilderHelper) ProcessFileStore(node ipld.Node, dataSize uint64) ipld.Node {
// Check if Filestore is being used.
if db.fullPath != "" {
// Check if the node is actually a raw node (needed for
// Filestore support).
if _, ok := node.(*dag.RawNode); ok {
fn := &pi.FilestoreNode{
Node: node,
PosInfo: &pi.PosInfo{
Offset: db.offset,
FullPath: db.fullPath,
Stat: db.stat,
},
}
// Update `offset` with the size of the data generated by `db.Next`.
db.offset += dataSize
return fn
}
}
// Filestore is not used, return the same `node` argument.
return node
}
// Add inserts the given node in the DAGService.
func (db *DagBuilderHelper) Add(node ipld.Node) error {
return db.dserv.Add(context.TODO(), node)
}
// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
// FSNodeOverDag encapsulates an `unixfs.FSNode` that will be stored in a
// `dag.ProtoNode`. Instead of just having a single `ipld.Node` that
// would need to be constantly (un)packed to access and modify its
// internal `FSNode` in the process of creating a UnixFS DAG, this
// structure stores an `FSNode` cache to manipulate it (add child nodes)
// directly , and only when the node has reached its final (immutable) state
// (signaled by calling `Commit()`) is it committed to a single (indivisible)
// `ipld.Node`.
//
// It is used mainly for internal (non-leaf) nodes, and for some
// representations of data leaf nodes (that don't use raw nodes or
// Filestore).
//
// It aims to replace the `UnixfsNode` structure which encapsulated too
// many possible node state combinations.
//
// TODO: Revisit the name.
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/importer/helpers.FSNodeOverDag
type FSNodeOverDag struct {
dag *dag.ProtoNode
file *ft.FSNode
}
// NewFSNodeOverDag creates a new `dag.ProtoNode` and `ft.FSNode`
// decoupled from one onther (and will continue in that way until
// `Commit` is called), with `fsNodeType` specifying the type of
// the UnixFS layer node (either `File` or `Raw`).
func (db *DagBuilderHelper) NewFSNodeOverDag(fsNodeType pb.Data_DataType) *FSNodeOverDag {
node := new(FSNodeOverDag)
node.dag = new(dag.ProtoNode)
node.dag.SetCidBuilder(db.GetCidBuilder())
node.file = ft.NewFSNode(fsNodeType)
return node
}
// NewFSNFromDag reconstructs a FSNodeOverDag node from a given dag node
func (db *DagBuilderHelper) NewFSNFromDag(nd *dag.ProtoNode) (*FSNodeOverDag, error) {
return NewFSNFromDag(nd)
}
// NewFSNFromDag reconstructs a FSNodeOverDag node from a given dag node
//
// Deprecated: use github.com/ipfs/boxo/ipld/unixfs/importer/helpers.NewFSNFromDag
func NewFSNFromDag(nd *dag.ProtoNode) (*FSNodeOverDag, error) {
mb, err := ft.FSNodeFromBytes(nd.Data())
if err != nil {
return nil, err
}
return &FSNodeOverDag{
dag: nd,
file: mb,
}, nil
}
// AddChild adds a `child` `ipld.Node` to both node layers. The
// `dag.ProtoNode` creates a link to the child node while the
// `ft.FSNode` stores its file size (that is, not the size of the
// node but the size of the file data that it is storing at the
// UnixFS layer). The child is also stored in the `DAGService`.
func (n *FSNodeOverDag) AddChild(child ipld.Node, fileSize uint64, db *DagBuilderHelper) error {
err := n.dag.AddNodeLink("", child)
if err != nil {
return err
}
n.file.AddBlockSize(fileSize)
return db.Add(child)
}
// RemoveChild deletes the child node at the given index.
func (n *FSNodeOverDag) RemoveChild(index int, dbh *DagBuilderHelper) {
n.file.RemoveBlockSize(index)
n.dag.SetLinks(append(n.dag.Links()[:index], n.dag.Links()[index+1:]...))
}
// Commit unifies (resolves) the cache nodes into a single `ipld.Node`
// that represents them: the `ft.FSNode` is encoded inside the
// `dag.ProtoNode`.
//
// TODO: Make it read-only after committing, allow to commit only once.
func (n *FSNodeOverDag) Commit() (ipld.Node, error) {
fileData, err := n.file.GetBytes()
if err != nil {
return nil, err
}
n.dag.SetData(fileData)
return n.dag, nil
}
// NumChildren returns the number of children of the `ft.FSNode`.
func (n *FSNodeOverDag) NumChildren() int {
return n.file.NumChildren()
}
// FileSize returns the `Filesize` attribute from the underlying
// representation of the `ft.FSNode`.
func (n *FSNodeOverDag) FileSize() uint64 {
return n.file.FileSize()
}
// SetFileData stores the `fileData` in the `ft.FSNode`. It
// should be used only when `FSNodeOverDag` represents a leaf
// node (internal nodes don't carry data, just file sizes).
func (n *FSNodeOverDag) SetFileData(fileData []byte) {
n.file.SetData(fileData)
}
// GetDagNode fills out the proper formatting for the FSNodeOverDag node
// inside of a DAG node and returns the dag node.
// TODO: Check if we have committed (passed the UnixFS information
// to the DAG layer) before returning this.
func (n *FSNodeOverDag) GetDagNode() (ipld.Node, error) {
return n.dag, nil
}
// GetChild gets the ith child of this node from the given DAGService.
func (n *FSNodeOverDag) GetChild(ctx context.Context, i int, ds ipld.DAGService) (*FSNodeOverDag, error) {
nd, err := n.dag.Links()[i].GetNode(ctx, ds)
if err != nil {
return nil, err
}
pbn, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
return NewFSNFromDag(pbn)
}