Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: media manager v1 #969

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
"@mapeo/crypto": "1.0.0-alpha.10",
"@mapeo/sqlite-indexer": "1.0.0-alpha.9",
"@sinclair/typebox": "^0.29.6",
"@sindresorhus/merge-streams": "^4.0.0",
"b4a": "^1.6.3",
"bcp-47": "^2.1.0",
"better-sqlite3": "^8.7.0",
Expand Down Expand Up @@ -203,6 +204,7 @@
"tiny-typed-emitter": "^2.1.0",
"type-fest": "^4.5.0",
"undici": "^6.13.0",
"unix-path-resolve": "^1.0.2",
"varint": "^6.0.0",
"ws": "^8.18.0",
"yauzl-promise": "^4.0.0"
Expand Down
130 changes: 130 additions & 0 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { createEntriesStream } from './entries-stream.js'
import { filePathMatchesFilter } from './utils.js'

/** @import { BlobFilter } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

/**
* Like hyperdrive.download() but 'live', and for multiple drives.
*
* Will emit an 'error' event for any unexpected errors. A consumer must attach
* an error listener to avoid uncaught errors. Sources of errors include:
*
* - If the entries stream emits an error
* - If a drive referenced in an entry is not found
* - If core.has() throws (e.g. if hypercore is closed)
* - If core.download().done() throws, which should not happen according to
* current hypercore code.
* - If the entries stream ends unexpectedly (it should be live and not end)
*
* NB: unlike hyperdrive.download(), this will also download deleted and
* previous versions of blobs - we don't currently support editing or deleting
* of blobs, so this should not be an issue, and if we do in the future,
* downloading deleted and previous versions may be desirable behavior anyway
*
* @extends {TypedEmitter<{ error: (error: Error) => void }>}
*/
export class Downloader extends TypedEmitter {
/** @type {THyperdriveIndex} */
#driveIndex
/** @type {Set<{ done(): Promise<void>, destroy(): void }>} */
#queuedDownloads = new Set()
#entriesStream
#processEntriesPromise
#ac = new AbortController()
#shouldDownloadFile

/**
* @param {THyperdriveIndex} driveIndex
* @param {object} [options]
* @param {BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download
*/
constructor(driveIndex, { filter } = {}) {
super()
this.#driveIndex = driveIndex

this.#shouldDownloadFile = filter
? filePathMatchesFilter.bind(null, filter)
: () => true

this.#entriesStream = createEntriesStream(driveIndex, { live: true })
this.#entriesStream.once('error', this.#handleError)

this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true })

this.#processEntriesPromise = this.#processEntries()
this.#processEntriesPromise.catch(this.#handleError)
}

/**
* Start processing entries from the entries stream - if an entry matches the
* filter, and we don't already have it, queue it for download. If the
* Downloader is live, this method will never resolve, otherwise it will
* resolve when all the entries have been processed and downloaded.
*/
async #processEntries() {
for await (const entry of this.#entriesStream) {
this.#ac.signal.throwIfAborted()
const {
driveId,
key: filePath,
value: { blob },
} = entry
if (!this.#shouldDownloadFile(filePath)) continue
const drive = this.#driveIndex.get(driveId)
// ERROR HANDLING: this is unexpected and should not happen
if (!drive) throw new Error('Drive not found: ' + driveId)
const blobs = await drive.getBlobs()
this.#ac.signal.throwIfAborted()
await this.#processEntry(blobs.core, blob)
this.#ac.signal.throwIfAborted()
}
throw new Error('Entries stream ended unexpectedly')
}

/**
* Update state and queue missing entries for download
*
* @param {import('hypercore')} blobsCore
* @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob
*/
async #processEntry(blobsCore, { blockOffset: start, blockLength: length }) {
const end = start + length
const have = await blobsCore.has(start, end)
this.#ac.signal.throwIfAborted()
if (have) return
const download = blobsCore.download({ start, end })
this.#queuedDownloads.add(download)
download
.done()
// According to the code, this should never throw.
.catch(this.#handleError)
.finally(() => {
this.#queuedDownloads.delete(download)
})
}

/**
* Cancel the downloads and clean up resources.
*/
destroy() {
this.#ac.abort()
}

/** @param {Error} error */
#handleError = (error) => {
if (this.#ac.signal.aborted) return
this.emit('error', error)
this.#ac.abort(error)
}

#handleAbort = () => {
for (const download of this.#queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#handleAbort)
this.#entriesStream.removeListener('error', this.#ac.abort)
// queuedDownloads is likely to be empty here anyway, but clear just in case.
this.#queuedDownloads.clear()
this.#entriesStream.destroy()
}
}
81 changes: 81 additions & 0 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import SubEncoder from 'sub-encoder'
import mergeStreams from '@sindresorhus/merge-streams'
import { Transform, pipeline } from 'node:stream'
import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')

/**
*
* @param {THyperdriveIndex} driveIndex
* @param {object} opts
* @param {boolean} [opts.live=false]
* @returns {BlobStoreEntriesStream}
*/
export function createEntriesStream(driveIndex, { live = false } = {}) {
const mergedEntriesStreams = mergeStreams(
[...driveIndex].map((drive) => getHistoryStream(drive.db, { live }))
)
driveIndex.on('add-drive', addDrive)
// Close is always emitted, so we can use it to remove the listener
mergedEntriesStreams.once('close', () =>
driveIndex.off('add-drive', addDrive)
)
return mergedEntriesStreams

/** @param {Hyperdrive} drive */
function addDrive(drive) {
mergedEntriesStreams.add(getHistoryStream(drive.db, { live }))
}
}

/**
*
* @param {import('hyperbee')} bee
* @param {object} opts
* @param {boolean} opts.live
*/
function getHistoryStream(bee, { live }) {
// This will also include old versions of files, but it is the only way to
// get a live stream from a Hyperbee, however we currently do not support
// edits of blobs, so this should not be an issue, and the consequence is
// that old versions are downloaded too, which is acceptable.
const historyStream = bee.createHistoryStream({
live,
// `keyEncoding` is necessary because hyperdrive stores file index data
// under the `files` sub-encoding key
keyEncoding,
})
return pipeline(historyStream, new AddDriveIds(bee.core), noop)
}

class AddDriveIds extends Transform {
#core
#cachedDriveId

/** @param {import('hypercore')} core */
constructor(core) {
super({ objectMode: true })
this.#core = core
this.#cachedDriveId = core.discoveryKey?.toString('hex')
}

/** @type {Transform['_transform']} */
_transform(entry, _, callback) {
// Minimal performance optimization to only call toString() once.
// core.discoveryKey will always be defined by the time it starts
// streaming, but could be null when the instance is first created.
let driveId
if (this.#cachedDriveId) {
driveId = this.#cachedDriveId
} else {
driveId = this.#core.discoveryKey?.toString('hex')
this.#cachedDriveId = driveId
}
callback(null, { ...entry, driveId })
}
}
122 changes: 122 additions & 0 deletions src/blob-store/hyperdrive-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import b4a from 'b4a'
import { discoveryKey } from 'hypercore-crypto'
import Hyperdrive from 'hyperdrive'
import util from 'node:util'
import { TypedEmitter } from 'tiny-typed-emitter'

/** @typedef {HyperdriveIndexImpl} THyperdriveIndex */

/**
* @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>}
*/
export class HyperdriveIndexImpl extends TypedEmitter {
/** @type {Map<string, Hyperdrive>} */
#hyperdrives = new Map()
#writer
#writerKey
/** @param {import('../core-manager/index.js').CoreManager} coreManager */
constructor(coreManager) {
super()
/** @type {undefined | Hyperdrive} */
let writer
const corestore = new PretendCorestore({ coreManager })
const blobIndexCores = coreManager.getCores('blobIndex')
const writerCoreRecord = coreManager.getWriterCore('blobIndex')
this.#writerKey = writerCoreRecord.key
for (const { key } of blobIndexCores) {
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
// We use the discovery key to derive the id for a drive
this.#hyperdrives.set(getDiscoveryId(key), drive)
if (key.equals(this.#writerKey)) {
writer = drive
}
}
if (!writer) {
throw new Error('Could not find a writer for the blobIndex namespace')
}
this.#writer = writer

coreManager.on('add-core', ({ key, namespace }) => {
if (namespace !== 'blobIndex') return
// We use the discovery key to derive the id for a drive
const driveId = getDiscoveryId(key)
if (this.#hyperdrives.has(driveId)) return
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
this.#hyperdrives.set(driveId, drive)
this.emit('add-drive', drive)
})
}
get writer() {
return this.#writer
}
get writerKey() {
return this.#writerKey
}
[Symbol.iterator]() {
return this.#hyperdrives.values()
}
/** @param {string} driveId */
get(driveId) {
return this.#hyperdrives.get(driveId)
}
}

/**
* Implements the `get()` method as used by hyperdrive-next. It returns the
* relevant cores from the Mapeo CoreManager.
*/
class PretendCorestore {
#coreManager
/**
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor({ coreManager }) {
this.#coreManager = coreManager
}

/**
* @param {Buffer | { publicKey: Buffer } | { name: string }} opts
* @returns {import('hypercore')<"binary", Buffer> | undefined}
*/
get(opts) {
if (b4a.isBuffer(opts)) {
opts = { publicKey: opts }
}
if ('key' in opts) {
// @ts-ignore
opts.publicKey = opts.key
}
if ('publicKey' in opts) {
// NB! We should always add blobIndex (Hyperbee) cores to the core manager
// before we use them here. We would only reach the addCore path if the
// blob core is read from the hyperbee header (before it is added to the
// core manager)
return (
this.#coreManager.getCoreByKey(opts.publicKey) ||
this.#coreManager.addCore(opts.publicKey, 'blob').core
)
} else if (opts.name === 'db') {
return this.#coreManager.getWriterCore('blobIndex').core
} else if (opts.name.includes('blobs')) {
return this.#coreManager.getWriterCore('blob').core
} else {
throw new Error(
'Unsupported corestore.get() with opts ' + util.inspect(opts)
)
}
}

/** no-op */
close() {}
}

/**
* @param {Buffer} key Public key of hypercore
* @returns {string} Hex-encoded string of derived discovery key
*/
function getDiscoveryId(key) {
return discoveryKey(key).toString('hex')
}
Loading
Loading