Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add filter option to de-duplicate blocks in car files #557

Merged
merged 18 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/car/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
"@helia/interface": "^4.3.0",
"@ipld/car": "^5.3.0",
"@libp2p/interfaces": "^3.3.2",
"@libp2p/utils": "^5.4.6",
"interface-blockstore": "^5.2.10",
"it-drain": "^3.0.5",
"it-map": "^3.0.5",
Expand All @@ -151,10 +152,12 @@
"progress-events": "^1.0.0"
},
"devDependencies": {
"@helia/mfs": "^3.0.6",
"@helia/unixfs": "^3.0.6",
"@ipld/dag-pb": "^4.1.0",
"aegir": "^44.0.1",
"blockstore-core": "^4.4.0",
"datastore-core": "^9.2.9",
"ipfs-unixfs-importer": "^15.2.4",
"it-to-buffer": "^4.0.5"
},
Expand Down
20 changes: 17 additions & 3 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import type { DAGWalker } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Filter } from '@libp2p/utils/filters'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'
Expand All @@ -76,6 +77,13 @@ export interface CarComponents {
dagWalkers: Record<number, DAGWalker>
}

interface ExportCarOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* If a filter is passed it will be used to deduplicate blocks exported in the car file
*/
blockFilter?: Filter
}

/**
* The Car interface provides operations for importing and exporting Car files
* from Helia's underlying blockstore.
Expand Down Expand Up @@ -129,7 +137,7 @@ export interface Car {
* await eventPromise
* ```
*/
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: ExportCarOptions): Promise<void>

/**
* Returns an AsyncGenerator that yields CAR file bytes.
Expand Down Expand Up @@ -170,7 +178,7 @@ class DefaultCar implements Car {
))
}

async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: ExportCarOptions): Promise<void> {
const deferred = defer<Error | undefined>()
const roots = Array.isArray(root) ? root : [root]

Expand All @@ -189,6 +197,12 @@ class DefaultCar implements Car {
for (const root of roots) {
void queue.add(async () => {
await this.#walkDag(root, queue, async (cid, bytes) => {
// if a filter has been passed, skip blocks that have already been written
if (options?.blockFilter?.has(cid.multihash.bytes) === true) {
return
}

options?.blockFilter?.add(cid.multihash.bytes)
await writer.put({ cid, bytes })
}, options)
})
Expand All @@ -203,7 +217,7 @@ class DefaultCar implements Car {
}
}

async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array, void, undefined> {
async * stream (root: CID | CID[], options?: ExportCarOptions): AsyncGenerator<Uint8Array, void, undefined> {
const { writer, out } = CarWriter.create(root)

// has to be done async so we write to `writer` and read from `out` at the
Expand Down
54 changes: 54 additions & 0 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/* eslint-env mocha */

import { mfs } from '@helia/mfs'
import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import { createScalableCuckooFilter } from '@libp2p/utils/filters'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
Expand Down Expand Up @@ -115,4 +118,55 @@ describe('import/export car file', () => {
expect(await toBuffer(u.cat(cid2))).to.equalBytes(fileData2)
expect(await toBuffer(u.cat(cid3))).to.equalBytes(fileData3)
})

it('exports a car file without duplicates', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')

const sourceCid = await otherUnixFS.addBytes(smallFile)
await otherMFS.cp(sourceCid, '/testDups/a.smallfile')
await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile')

const rootObject = await otherMFS.stat('/testDups/')
const rootCid = rootObject.cid

const writer = memoryCarWriter(rootCid)
const blockFilter = createScalableCuckooFilter(5)
await otherCar.export(rootCid, writer, {
blockFilter
})

const carBytes = await writer.bytes()
expect(carBytes.length).to.equal(349)
})

it('exports a car file with duplicates', async () => {
Comment on lines +122 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain should we merge these two tests and export one with filter and one without?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of duplication, it'd be good to either define it as a fixture or put it in a new test file that has a setup block that creates the car exporters, writer, etc.

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')

const sourceCid = await otherUnixFS.addBytes(smallFile)
await otherMFS.cp(sourceCid, '/testDups/a.smallfile')
await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile')

const rootObject = await otherMFS.stat('/testDups/')
const rootCid = rootObject.cid

const writer = memoryCarWriter(rootCid)
await otherCar.export(rootCid, writer)

const carBytes = await writer.bytes()
expect(carBytes.length).to.equal(399)
})
})
Loading