From 47e72d400b91e0a7852158d9917970f660b46d3d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 29 Jan 2025 18:30:36 -0800 Subject: [PATCH] feat(locator): unify locator with seperate clients seperates locator logic from the underlying client, which is abstracted to behave like the indexer client, with a content claims adaptation BREAKING CHANGE: content claims is now just a client, not a locator --- package.json | 16 +- pnpm-lock.yaml | 22 +- src/locator/content-claims-client.js | 193 +++++++++++++ src/locator/content-claims.js | 262 ------------------ src/locator/index.js | 250 ++++++++++++++++- src/locator/indexing-service/index.js | 239 ---------------- test/fetcher.spec.js | 6 +- ...dexing-service.spec.js => locator.spec.js} | 180 +++++++++++- 8 files changed, 640 insertions(+), 528 deletions(-) create mode 100644 src/locator/content-claims-client.js delete mode 100644 src/locator/content-claims.js delete mode 100644 src/locator/indexing-service/index.js rename test/{locator/indexing-service.spec.js => locator.spec.js} (70%) diff --git a/package.json b/package.json index 7cb98d8..a7cdbb8 100644 --- a/package.json +++ b/package.json @@ -43,13 +43,13 @@ "import": "./src/fetcher/batching.js", "types": "./dist/src/fetcher/batching.d.ts" }, - "./locator/content-claims": { - "import": "./src/locator/content-claims.js", - "types": "./dist/src/locator/content-claims.d.ts" + "./locator/content-claims-client": { + "import": "./src/locator/content-claims-client.js", + "types": "./dist/src/locator/content-claims-client.d.ts" }, - "./locator/indexing-service": { - "import": "./src/locator/indexing-service/index.js", - "types": "./dist/src/locator/indexing-service/index.d.ts" + "./locator": { + "import": "./src/locator/index.js", + "types": "./dist/src/locator/index.d.ts" }, "./tracing/tracing": { "import": "./src/tracing/tracing.js", @@ -60,9 +60,9 @@ "@cloudflare/workers-types": "^4.20241022.0", "@ipld/dag-ucan": "^3.4.0", "@opentelemetry/api": "^1.9.0", - "@storacha/indexing-service-client": "2.0.1-rc.5", + "@storacha/indexing-service-client": "2.0.1-rc.6", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/content-claims": "5.1.4-rc.5", + "@web3-storage/content-claims": "5.2.0", "multiformats": "^13.1.0", "multipart-byte-range": "^3.0.1", "p-defer": "^4.0.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ad56e34..d8d1950 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,14 +18,14 @@ importers: specifier: ^1.9.0 version: 1.9.0 '@storacha/indexing-service-client': - specifier: 2.0.1-rc.5 - version: 2.0.1-rc.5 + specifier: 2.0.1-rc.6 + version: 2.0.1-rc.6 '@web3-storage/blob-index': specifier: ^1.0.2 version: 1.0.4 '@web3-storage/content-claims': - specifier: 5.1.4-rc.5 - version: 5.1.4-rc.5 + specifier: 5.2.0 + version: 5.2.0 multiformats: specifier: ^13.1.0 version: 13.1.0 @@ -232,8 +232,8 @@ packages: '@storacha/capabilities@0.0.0': resolution: {integrity: sha512-6PINXoAMrcDS0a76H+d/LM5QlrfrDSbdHOoB8YT1AwQipJfMKTDUVFhWgpIaUrNljoJ8JHh7Y0Z/xb/biKwtYg==} - '@storacha/indexing-service-client@2.0.1-rc.5': - resolution: {integrity: sha512-AOzP/PpDMkrvt89A88x5rgiAmFD0qWpMCgnQ3a3dbaWnzmmFFTfFtW3PzvoBG9edjUZwCjBLE+nVcOkrZG6JPg==} + '@storacha/indexing-service-client@2.0.1-rc.6': + resolution: {integrity: sha512-BYNde2AzjvXW/nLsy0NMTuBYa+Ar8nUDjLkZuxSw9geBP015sakiSnUvfCwqoTDMEytghVE2/4xYKmr2jJfU5Q==} '@storacha/one-webcrypto@1.0.1': resolution: {integrity: sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==} @@ -278,8 +278,8 @@ packages: '@web3-storage/capabilities@17.4.1': resolution: {integrity: sha512-GogLfON8PZabi03CUyncBvMcCi36yQ/0iR5P8kr4pxdnZm7OuAn4sEwbEB8rTKbah5V10Vwgb3O5dh0FBgyjHg==} - '@web3-storage/content-claims@5.1.4-rc.5': - resolution: {integrity: sha512-gVY6WpUhDmoyCI9v8gFNl0ASuChBEfbjKdub5lNsMuyr5le6DQ8D95v3o2Aa042f2qFRFdRw79ryEWJfDa8Btw==} + '@web3-storage/content-claims@5.2.0': + resolution: {integrity: sha512-FV1WIfo+lL9igjkAtHU2dl2blJfoWCWnmCAO4bhFpZD2MwojUWlVHP1t6y6qL7QU1+o7Ax9tpiAWYsb1U7Pixg==} '@web3-storage/data-segment@5.3.0': resolution: {integrity: sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==} @@ -1708,13 +1708,13 @@ snapshots: '@web3-storage/data-segment': 5.3.0 uint8arrays: 5.1.0 - '@storacha/indexing-service-client@2.0.1-rc.5': + '@storacha/indexing-service-client@2.0.1-rc.6': dependencies: '@ipld/dag-cbor': 9.2.2 '@storacha/blob-index': 0.0.0 '@ucanto/core': 10.2.1 '@ucanto/interface': 10.1.1 - '@web3-storage/content-claims': 5.1.4-rc.5 + '@web3-storage/content-claims': 5.2.0 multiformats: 13.3.1 '@storacha/one-webcrypto@1.0.1': {} @@ -1798,7 +1798,7 @@ snapshots: '@web3-storage/data-segment': 5.3.0 uint8arrays: 5.1.0 - '@web3-storage/content-claims@5.1.4-rc.5': + '@web3-storage/content-claims@5.2.0': dependencies: '@ucanto/client': 9.0.1 '@ucanto/core': 10.2.1 diff --git a/src/locator/content-claims-client.js b/src/locator/content-claims-client.js new file mode 100644 index 0000000..8dab8e2 --- /dev/null +++ b/src/locator/content-claims-client.js @@ -0,0 +1,193 @@ +import { DigestMap, ShardedDAGIndex } from '@web3-storage/blob-index' +import { Assert } from '@web3-storage/content-claims/capability' +import * as Claims from '@web3-storage/content-claims/client' +import { withSimpleSpan } from '../tracing/tracing.js' +import * as ed25519 from '@ucanto/principal/ed25519' +import { base58btc } from 'multiformats/bases/base58' +import { NotFoundError } from '../lib.js' +import { from } from '@storacha/indexing-service-client/query-result' + +/** + * @import * as API from '../api.js' + * @import {Claim, IndexingServiceClient, Query, QueryError, QueryOk, Result, Kind} from '@storacha/indexing-service-client/api' + * @import {KnownClaimTypes, LocationClaim} from '@web3-storage/content-claims/client/api' + */ + +/** + * @typedef {{ serviceURL?: URL, carpark?: import('@cloudflare/workers-types').R2Bucket, carparkPublicBucketURL?: URL}} LocatorOptions + */ + +/** + * ContentClaimsClient mimics the indexing service client using the content claims service + * @implements {IndexingServiceClient} + */ +export class ContentClaimsClient { + /** + * @type {DigestMap>} + */ + async queryClaims (q) { + /** @type {Claim[]} */ + const claims = [] + /** @type {Map allowedClaimTypes[kind].includes(claim.type)) + let indexBytes + if (digestClaims.length === 0) { + const backups = await this.#carparkBackup(digest) + if (backups) { + claims.push(backups.claim) + indexBytes = backups.indexBytes + } + } else { + claims.push(...digestClaims) + for (const claim of digestClaims) { + if (claim.type === 'assert/index') { + this.#indexCids.set(claim.index.multihash, true) + } + if (claim.type === 'assert/location' && this.#indexCids.has(Claims.contentMultihash(claim))) { + try { + const fetchRes = await fetchIndex(claim) + indexBytes = await fetchRes.bytes() + } catch (err) { + console.warn('unable to fetch index', err instanceof Error ? err.message : 'unknown error') + } + } + } + } + if (indexBytes) { + const decodeRes = ShardedDAGIndex.extract(indexBytes) + if (decodeRes.error) { + console.warn('failed to decode index', decodeRes.error) + continue + } + indexes.set(base58btc.encode(digest.bytes), decodeRes.ok) + } + } + return /** @type {Result} */(await from({ claims, indexes })) + } + + /** + * + * @param {*} digest + * @returns + */ + async #carparkBackup (digest) { + let indexBytes + if (this.#carpark === undefined || this.#carparkPublicBucketURL === undefined) { + return + } + if (this.#indexCids.has(digest)) { + const obj = await withSimpleSpan('carPark.get', this.#carpark.get, this.#carpark)(toBlobKey(digest)) + if (!obj) { + return + } + indexBytes = new Uint8Array(await obj.arrayBuffer()) + } else { + const obj = await this.#carpark.head(toBlobKey(digest)) + if (!obj) { + return + } + } + return { + claim: await Claims.decodeDelegation(await Assert.location + .invoke({ + issuer: await this.#getSigner(), + audience: await this.#getSigner(), + with: (await this.#getSigner()).did(), + nb: { + content: { digest: digest.bytes }, + location: [ + /** @type {API.URI} */((new URL(toBlobKey(digest), this.#carparkPublicBucketURL)).href) + ] + } + }) + .delegate()), + indexBytes + } + } +} + +/** @type {Record>} */ +const allowedClaimTypes = { + standard: ['assert/location', 'assert/partition', 'assert/inclusion', 'assert/index', 'assert/equals', 'assert/relation'], + index_or_location: ['assert/location', 'assert/index'], + location: ['assert/location'] +} + +/** @param {import('multiformats').MultihashDigest} digest */ +const toBlobKey = digest => { + const mhStr = base58btc.encode(digest.bytes) + return `${mhStr}/${mhStr}.blob` +} + +const fetchIndex = withSimpleSpan('fetchIndex', + /** + * Fetch a blob from the passed location. + * @param {LocationClaim} locationClaim + */ + async (locationClaim) => { + for (const uri of locationClaim.location) { + const url = new URL(uri) + /** @type {HeadersInit} */ + const headers = {} + if (locationClaim.range) { + headers.Range = `bytes=${locationClaim.range.offset}-${ + locationClaim.range.length ? locationClaim.range.offset + locationClaim.range.length - 1 : ''}` + } + const res = await fetch(url, { headers }) + if (!res.ok || !res.body) { + console.warn(`failed to fetch ${url}: ${res.status} ${await res.text()}`) + continue + } + return res + } + + throw new NotFoundError(Claims.contentMultihash(locationClaim)) + } +) diff --git a/src/locator/content-claims.js b/src/locator/content-claims.js deleted file mode 100644 index 74478ae..0000000 --- a/src/locator/content-claims.js +++ /dev/null @@ -1,262 +0,0 @@ -// eslint-disable-next-line -import * as API from '../api.js' -import * as Claims from '@web3-storage/content-claims/client' -import { DigestMap, ShardedDAGIndex } from '@web3-storage/blob-index' -import { fetchBlob } from '../fetcher/simple.js' -import { NotFoundError } from '../lib.js' -import { base58btc } from 'multiformats/bases/base58' -import { withSimpleSpan } from '../tracing/tracing.js' -import { digest as digestMod } from 'multiformats' -/** - * @import { DID } from '@ucanto/interface' - * @import { UnknownLink } from 'multiformats' - * @import { MultihashDigest } from 'multiformats' - * @import { ShardDigest, Position } from '@web3-storage/blob-index/types' - */ - -/** - * @typedef {{ serviceURL?: URL, carpark?: import('@cloudflare/workers-types').R2Bucket, carparkPublicBucketURL?: URL}} LocatorOptions - */ -/** @implements {API.Locator} */ -export class ContentClaimsLocator { - /** - * Cached location entries. - * @type {DigestMap} - */ - #cache - - /** @type {DigestMap} */ - #knownSlices - - /** - * Multihash digests for which we have already fetched claims. - * - * Note: _only_ the digests which have been explicitly queried, for which we - * have made a content claim request. Not using `this.#cache` because reading - * a claim may cause us to add other digests to the cache that we haven't - * read claims for. - * - * Note: implemented as a Map not a Set so that we take advantage of the - * key cache that `DigestMap` provides, so we don't duplicate base58 encoded - * multihash keys. - * @type {DigestMap>} - */ - #claimFetched - /** - * @type {URL|undefined} - */ - #serviceURL - /** - * @type {import('@cloudflare/workers-types').R2Bucket|undefined} - */ - #carpark - /** - * @type {URL | Undefined} - */ - #carparkPublicBucketURL - /** - * @param {LocatorOptions} [options] - */ - constructor (options) { - this.#cache = new DigestMap() - this.#claimFetched = new DigestMap() - this.#serviceURL = options?.serviceURL - this.#carpark = options?.carpark - this.#carparkPublicBucketURL = options?.carparkPublicBucketURL - this.#knownSlices = new DigestMap() - } - - /** @param {API.MultihashDigest} digest */ - async locate (digest) { - // get the index data for this CID (CAR CID & offset) - let location = this.#cache.get(digest) - if (!location) { - // we not found the index data! - await this.#readClaims(digest) - // seeing as we just read the index for this CID we _should_ have some - // index information for it now. - location = this.#cache.get(digest) - // if not then, well, it's not found! - if (!location) return { error: new NotFoundError(digest) } - } - return { ok: location } - } - - /** - * - * @param {API.MultihashDigest} digest - * @param {ShardDigest} shard - * @param {Position} pos - * @returns - */ - async #readShard (digest, shard, pos) { - await this.#readClaims(shard) - let location = this.#cache.get(shard) - if (!location) { - if (this.#carpark === undefined || this.#carparkPublicBucketURL === undefined) { - return - } - const obj = await this.#carpark.head(toBlobKey(shard)) - if (!obj) { - return - } - location = { - digest: shard, - site: [{ - location: [new URL(toBlobKey(shard), this.#carparkPublicBucketURL)], - range: { offset: 0, length: obj.size } - }] - } - this.#cache.set(shard, location) - } - this.#cache.set(digest, { - digest, - site: location.site.map(s => ({ - location: s.location, - range: { - offset: s.range.offset + pos[0], - length: pos[1] - } - })) - }) - } - - /** - * - * @param {API.MultihashDigest} digest - */ - async #executeReadClaims (digest) { - const knownSlice = this.#knownSlices.get(digest) - if (knownSlice && !digestMod.equals(knownSlice.shardDigest, digest)) { - await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) - return - } - const claims = await Claims.read(digest, { serviceURL: this.#serviceURL }) - for (const claim of claims) { - if (claim.type === 'assert/location' && claim.range?.length != null) { - const location = this.#cache.get(digest) - if (location) { - location.site.push({ - location: claim.location.map(l => new URL(l)), - range: { offset: claim.range.offset, length: claim.range.length } - }) - } else { - this.#cache.set(digest, { - digest, - site: [{ - location: claim.location.map(l => new URL(l)), - range: { offset: claim.range.offset, length: claim.range.length } - }] - }) - } - } - - if (claim.type === 'assert/index') { - await this.#readClaims(claim.index.multihash) - const location = this.#cache.get(claim.index.multihash) - /** @type {Uint8Array} */ - let indexBytes - if (!location) { - if (this.#carpark === undefined) { - continue - } - const obj = await withSimpleSpan('carPark.get', this.#carpark.get, this.#carpark)(toBlobKey(claim.index.multihash)) - if (!obj) { - continue - } - indexBytes = new Uint8Array(await obj.arrayBuffer()) - } else { - const fetchRes = await fetchBlob(location) - if (fetchRes.error) { - console.warn('failed to fetch index', fetchRes.error) - continue - } - indexBytes = await fetchRes.ok.bytes() - } - - const decodeRes = ShardedDAGIndex.extract(indexBytes) - if (decodeRes.error) { - console.warn('failed to decode index', decodeRes.error) - continue - } - - const index = decodeRes.ok - for (const [shardDigest, slices] of index.shards) { - for (const [sliceDigest, position] of slices) { - this.#knownSlices.set(sliceDigest, { shardDigest, position }) - } - } - const knownSlice = this.#knownSlices.get(digest) - if (knownSlice) { - await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) - } - } - } - } - - /** - * Read claims for the passed CID and populate the cache. - * @param {API.MultihashDigest} digest - */ - async #internalReadClaims (digest) { - if (this.#claimFetched.has(digest)) { - return this.#claimFetched.get(digest) - } - const promise = this.#executeReadClaims(digest) - this.#claimFetched.set(digest, promise) - return promise - } - - /** - * Read claims for the passed CID and populate the cache. - * @param {API.MultihashDigest} digest - */ - #readClaims = withSimpleSpan('readClaims', this.#internalReadClaims, this) - - /** @type {API.Locator['scopeToSpaces']} */ - scopeToSpaces (spaces) { - return spaceFilteredLocator(this, spaces) - } -} - -/** - * Create a new content claims blob locator. - * @param {LocatorOptions} [options] - * @returns {API.Locator} - */ -export const create = (options) => new ContentClaimsLocator(options) - -/** @param {import('multiformats').MultihashDigest} digest */ -const toBlobKey = digest => { - const mhStr = base58btc.encode(digest.bytes) - return `${mhStr}/${mhStr}.blob` -} - -/** - * Wraps a {@link Locator} to filter results to the given Spaces. - * - * @param {API.Locator} locator - * @param {DID[]} spaces - * @returns {API.Locator} - */ -const spaceFilteredLocator = (locator, spaces) => ({ - async locate (digest) { - const locateResult = await locator.locate(digest) - if (locateResult.error) { - return locateResult - } else { - return { - ok: { - ...locateResult.ok, - site: locateResult.ok.site.filter( - (site) => - site.space && spaces.includes(site.space) - ) - } - } - } - }, - scopeToSpaces (spaces) { - return spaceFilteredLocator(this, spaces) - } -}) diff --git a/src/locator/index.js b/src/locator/index.js index ef18d23..0951de6 100644 --- a/src/locator/index.js +++ b/src/locator/index.js @@ -1 +1,249 @@ -export * as ContentClaimsLocator from './content-claims.js' +import { DigestMap } from '@web3-storage/blob-index' +import { NotFoundError } from '../lib.js' +import { withSimpleSpan } from '../tracing/tracing.js' +import { contentMultihash } from '@web3-storage/content-claims/client' + +/** + * @import * as API from '../api.js' + * @import {Kind, IndexingServiceClient as ServiceClient} from '@storacha/indexing-service-client/api' + * @import { DID } from '@ucanto/interface' + * @import { ShardDigest, Position } from '@web3-storage/blob-index/types' + */ + +/** + * @typedef {Object} LocatorOptions + * @property {ServiceClient} client An Indexing Service client instance. + * @property {DID[]} [spaces] The Spaces to search for the content. If + * missing, the locator will search all Spaces. + */ + +/** @implements {IndexingServiceLocator} */ +export class IndexingServiceLocator { + #client + #spaces + + /** + * Cached location entries. + * @type {DigestMap} + */ + #cache + + /** @type {DigestMap} */ + #knownSlices + + /** + * Known Shards are locations claims we have a URL for but no length. They can be combined with known + * slices to make a location entry, but can't be used for blob fetching on their own + * @type {DigestMap} + * + */ + #knownShards + + /** + * Multihash digests for which we have already fetched claims. + * + * Note: _only_ the digests which have been explicitly queried, for which we + * have made a content claim request. Not using `this.#cache` because reading + * a claim may cause us to add other digests to the cache that we haven't + * read claims for. + * + * Note: implemented as a Map not a Set so that we take advantage of the + * key cache that `DigestMap` provides, so we don't duplicate base58 encoded + * multihash keys. + * @type {Record>>} + */ + #claimFetched + + /** + * + * @param {LocatorOptions} options + */ + constructor ({ client, spaces }) { + this.#client = client + this.#spaces = spaces ?? [] + this.#cache = new DigestMap() + this.#claimFetched = { + index_or_location: new DigestMap(), + location: new DigestMap(), + standard: new DigestMap() + } + this.#knownSlices = new DigestMap() + this.#knownShards = new DigestMap() + } + + /** @param {API.MultihashDigest} digest */ + async locate (digest) { + // get the cached data for this CID (CAR CID & offset) + let location = this.#cache.get(digest) + if (!location) { + // no full cached data -- but perhaps we have the shard already? + const knownSlice = this.#knownSlices.get(digest) + if (knownSlice) { + // read the shard + await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) + } else { + // nope we don't know anything really here, better read for the digest + await this.#readClaims(digest, 'standard') + // if we now have and index, read the shard + const knownSlice = this.#knownSlices.get(digest) + if (knownSlice) { + // read the shard + await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) + } + } + // seeing as we just read the index for this CID we _should_ have some + // index information for it now. + location = this.#cache.get(digest) + // if not then, well, it's not found! + if (!location) return { error: new NotFoundError(digest) } + } + return { ok: location } + } + + /** + * + * @param {API.MultihashDigest} digest + * @param {ShardDigest} shard + * @param {Position} pos + * @returns + */ + async #readShard (digest, shard, pos) { + let location = this.#getShard(shard) + if (!location) { + await this.#readClaims(shard, 'location') + location = this.#getShard(shard) + // if not then, well, it's not found! + if (!location) return + } + this.#cache.set(digest, { + digest, + site: location.site.map(s => ({ + location: s.location, + range: { + offset: (s.range?.offset || 0) + pos[0], + length: pos[1] + }, + space: s.space + })) + }) + } + + /** + * + * @param {API.MultihashDigest} shardKey + * @returns + */ + #getShard (shardKey) { + const knownShard = this.#knownShards.get(shardKey) + if (knownShard) { + return knownShard + } + return this.#cache.get(shardKey) + } + + /** + * + * @param {API.MultihashDigest} digest + * @param {Kind} kind + */ + async #executeReadClaims (digest, kind) { + const result = await this.#client.queryClaims({ + hashes: [digest], + match: this.#spaces && { subject: this.#spaces }, + kind + }) + + if (result.error) return + + // process any location claims + for (const claim of result.ok.claims.values()) { + if (claim.type === 'assert/location') { + if (claim.range?.length != null) { + addOrSetLocation(this.#cache, contentMultihash(claim), { + location: claim.location.map(l => new URL(l)), + range: { offset: claim.range.offset, length: claim.range.length }, + space: claim.space + }) + } else { + addOrSetLocation(this.#knownShards, contentMultihash(claim), { + location: claim.location.map(l => new URL(l)), + range: claim.range ? { offset: claim.range.offset } : undefined, + space: claim.space + }) + } + } + } + + // fetch location claims for any indexes we don't have a known shard for + for (const claim of result.ok.claims.values()) { + if (claim.type === 'assert/index') { + const location = this.#getShard(claim.index.multihash) + if (!location) { + await this.#readClaims(claim.index.multihash, 'location') + } + } + } + + // read any indexes in this request + for (const index of result.ok.indexes.values()) { + for (const [shardDigest, slices] of index.shards) { + for (const [sliceDigest, position] of slices) { + this.#knownSlices.set(sliceDigest, { shardDigest, position }) + } + } + } + } + + /** + * Read claims for the passed CID and populate the cache. + * @param {API.MultihashDigest} digest + * @param {Kind} kind + */ + async #internalReadClaims (digest, kind) { + if (this.#claimFetched[kind].has(digest)) { + return this.#claimFetched[kind].get(digest) + } + const promise = this.#executeReadClaims(digest, kind) + this.#claimFetched[kind].set(digest, promise) + return promise + } + + /** + * Read claims for the passed CID and populate the cache. + * @param {API.MultihashDigest} digest + */ + #readClaims = withSimpleSpan('readClaims', this.#internalReadClaims, this) + + /** @type {API.Locator['scopeToSpaces']} */ + scopeToSpaces (spaces) { + return new IndexingServiceLocator({ + client: this.#client, + spaces: [...new Set([...this.#spaces, ...spaces]).values()] + }) + } +} + +/** + * Create a new content claims blob locator. + * @param {LocatorOptions} options + * @returns {API.Locator} + */ +export const create = (options) => new IndexingServiceLocator(options) + +/** + * @template {API.OptionalSite} T + * @param {DigestMap} cache + * @param {API.MultihashDigest} digest + * @param {T} site + */ +const addOrSetLocation = (cache, digest, site) => { + const location = cache.get(digest) + if (location) { + location.site.push(site) + } else { + cache.set(digest, { + digest, + site: [site] + }) + } +} diff --git a/src/locator/indexing-service/index.js b/src/locator/indexing-service/index.js deleted file mode 100644 index a41b13c..0000000 --- a/src/locator/indexing-service/index.js +++ /dev/null @@ -1,239 +0,0 @@ -import { DigestMap } from '@web3-storage/blob-index' -import { NotFoundError } from '../../lib.js' -import { withSimpleSpan } from '../../tracing/tracing.js' -import { contentMultihash } from '@web3-storage/content-claims/client' - -/** - * @import * as API from '../../api.js' - * @import {Kind, IndexingServiceClient as ServiceClient} from '@storacha/indexing-service-client/api' - * @import { DID } from '@ucanto/interface' - * @import { ShardDigest, Position } from '@web3-storage/blob-index/types' - */ - -/** - * @typedef {Object} LocatorOptions - * @property {ServiceClient} client An Indexing Service client instance. - * @property {DID[]} [spaces] The Spaces to search for the content. If - * missing, the locator will search all Spaces. - */ - -/** @implements {IndexingServiceLocator} */ -export class IndexingServiceLocator { - #client - #spaces - - /** - * Cached location entries. - * @type {DigestMap} - */ - #cache - - /** @type {DigestMap} */ - #knownSlices - - /** - * Known Shards are locations claims we have a URL for but no length. They can be combined with known - * slices to make a location entry, but can't be used for blob fetching on their own - * @type {DigestMap} - * - */ - #knownShards - - /** - * Multihash digests for which we have already fetched claims. - * - * Note: _only_ the digests which have been explicitly queried, for which we - * have made a content claim request. Not using `this.#cache` because reading - * a claim may cause us to add other digests to the cache that we haven't - * read claims for. - * - * Note: implemented as a Map not a Set so that we take advantage of the - * key cache that `DigestMap` provides, so we don't duplicate base58 encoded - * multihash keys. - * @type {Record>>} - */ - #claimFetched - - /** - * - * @param {LocatorOptions} options - */ - constructor ({ client, spaces }) { - this.#client = client - this.#spaces = spaces ?? [] - this.#cache = new DigestMap() - this.#claimFetched = { - index_or_location: new DigestMap(), - location: new DigestMap(), - standard: new DigestMap() - } - this.#knownSlices = new DigestMap() - this.#knownShards = new DigestMap() - } - - /** @param {API.MultihashDigest} digest */ - async locate (digest) { - // get the cached data for this CID (CAR CID & offset) - let location = this.#cache.get(digest) - if (!location) { - // no full cached data -- but perhaps we have the shard already? - const knownSlice = this.#knownSlices.get(digest) - if (knownSlice) { - // read the shard - await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) - } else { - // nope we don't know anything really here, better read for the digest - await this.#readClaims(digest, 'standard') - // if we now have and index, read the shard - const knownSlice = this.#knownSlices.get(digest) - if (knownSlice) { - // read the shard - await this.#readShard(digest, knownSlice.shardDigest, knownSlice.position) - } - } - // seeing as we just read the index for this CID we _should_ have some - // index information for it now. - location = this.#cache.get(digest) - // if not then, well, it's not found! - if (!location) return { error: new NotFoundError(digest) } - } - return { ok: location } - } - - /** - * - * @param {API.MultihashDigest} digest - * @param {ShardDigest} shard - * @param {Position} pos - * @returns - */ - async #readShard (digest, shard, pos) { - let location = this.#getShard(shard) - if (!location) { - await this.#readClaims(shard, 'location') - location = this.#getShard(shard) - // if not then, well, it's not found! - if (!location) return - } - this.#cache.set(digest, { - digest, - site: location.site.map(s => ({ - location: s.location, - range: { - offset: (s.range?.offset || 0) + pos[0], - length: pos[1] - }, - space: s.space - })) - }) - } - - /** - * - * @param {API.MultihashDigest} shardKey - * @returns - */ - #getShard (shardKey) { - const knownShard = this.#knownShards.get(shardKey) - if (knownShard) { - return knownShard - } - return this.#cache.get(shardKey) - } - - /** - * - * @param {API.MultihashDigest} digest - * @param {Kind} kind - */ - async #executeReadClaims (digest, kind) { - const result = await this.#client.queryClaims({ - hashes: [digest], - match: this.#spaces && { subject: this.#spaces }, - kind - }) - - if (result.error) return - - // process any location claims - for (const claim of result.ok.claims.values()) { - if (claim.type === 'assert/location') { - if (claim.range?.length != null) { - addOrSetLocation(this.#cache, contentMultihash(claim), { - location: claim.location.map(l => new URL(l)), - range: { offset: claim.range.offset, length: claim.range.length }, - space: claim.space - }) - } else { - addOrSetLocation(this.#knownShards, contentMultihash(claim), { - location: claim.location.map(l => new URL(l)), - range: claim.range ? { offset: claim.range.offset } : undefined, - space: claim.space - }) - } - } - } - - // read any indexes in this request - for (const index of result.ok.indexes.values()) { - for (const [shardDigest, slices] of index.shards) { - for (const [sliceDigest, position] of slices) { - this.#knownSlices.set(sliceDigest, { shardDigest, position }) - } - } - } - } - - /** - * Read claims for the passed CID and populate the cache. - * @param {API.MultihashDigest} digest - * @param {Kind} kind - */ - async #internalReadClaims (digest, kind) { - if (this.#claimFetched[kind].has(digest)) { - return this.#claimFetched[kind].get(digest) - } - const promise = this.#executeReadClaims(digest, kind) - this.#claimFetched[kind].set(digest, promise) - return promise - } - - /** - * Read claims for the passed CID and populate the cache. - * @param {API.MultihashDigest} digest - */ - #readClaims = withSimpleSpan('readClaims', this.#internalReadClaims, this) - - /** @type {API.Locator['scopeToSpaces']} */ - scopeToSpaces (spaces) { - return new IndexingServiceLocator({ - client: this.#client, - spaces: [...new Set([...this.#spaces, ...spaces]).values()] - }) - } -} - -/** - * Create a new content claims blob locator. - * @param {LocatorOptions} options - * @returns {API.Locator} - */ -export const create = (options) => new IndexingServiceLocator(options) - -/** - * @template {API.OptionalSite} T - * @param {DigestMap} cache - * @param {API.MultihashDigest} digest - * @param {T} site - */ -const addOrSetLocation = (cache, digest, site) => { - const location = cache.get(digest) - if (location) { - location.site.push(site) - } else { - cache.set(digest, { - digest, - site: [site] - }) - } -} diff --git a/test/fetcher.spec.js b/test/fetcher.spec.js index 74783a7..9e7c4ab 100644 --- a/test/fetcher.spec.js +++ b/test/fetcher.spec.js @@ -12,7 +12,6 @@ import { exporter } from 'ipfs-unixfs-exporter' import * as ed25519 from '@ucanto/principal/ed25519' import * as SimpleFetcher from '../src/fetcher/simple.js' import * as BatchingFetcher from '../src/fetcher/batching.js' -import * as ContentClaimsLocator from '../src/locator/content-claims.js' import { randomBytes, randomInt } from './helpers/random.js' import { concat } from './helpers/stream.js' import { settings } from './helpers/unixfs.js' @@ -21,8 +20,9 @@ import { generateIndexClaim, generateLocationClaim, generateLocationClaims, with import { asBlockstore } from './helpers/unixfs-exporter.js' import * as Fetch from './helpers/fetch.js' import * as Result from './helpers/result.js' -import * as IndexingServiceLocator from '../src/locator/indexing-service/index.js' +import * as IndexingServiceLocator from '../src/locator/index.js' import { Client } from '@storacha/indexing-service-client' +import { ContentClaimsClient } from '../src/locator/content-claims-client.js' // simulates cloudflare worker environment with max 6 concurrent reqs Fetch.patch({ concurrency: 6, lag: 50 }) @@ -38,7 +38,7 @@ const withContext = testfn => withBucketServer(withClaimsServer(withTestIndexer( ;[ { locatorName: 'indexer', LocatorFactory: (/** @type {Context} */ctx) => IndexingServiceLocator.create({ client: new Client({ serviceURL: ctx.indexerURL }) }) }, - { locatorName: 'content claims', LocatorFactory: (/** @type {Context} */ctx) => ContentClaimsLocator.create({ serviceURL: ctx.claimsURL }) } + { locatorName: 'content claims', LocatorFactory: (/** @type {Context} */ctx) => IndexingServiceLocator.create({ client: new ContentClaimsClient({ serviceURL: ctx.claimsURL }) }) } ].forEach(({ locatorName, LocatorFactory }) => { [ { name: 'simple', FetcherFactory: SimpleFetcher }, diff --git a/test/locator/indexing-service.spec.js b/test/locator.spec.js similarity index 70% rename from test/locator/indexing-service.spec.js rename to test/locator.spec.js index deb7015..c48ecb1 100644 --- a/test/locator/indexing-service.spec.js +++ b/test/locator.spec.js @@ -6,12 +6,15 @@ import * as ed25519 from '@ucanto/principal/ed25519' import { Client } from '@storacha/indexing-service-client' import * as QueryResult from '@storacha/indexing-service-client/query-result' import { Assert } from '@web3-storage/content-claims/capability' -import { createTestCID } from '../util/createTestCID.js' +import { createTestCID } from './util/createTestCID.js' import { ShardedDAGIndex } from '@web3-storage/blob-index' -import { IndexingServiceLocator } from '../../src/locator/indexing-service/index.js' -import { NotFoundError } from '../../src/lib.js' +import { IndexingServiceLocator } from '../src/locator/index.js' +import { NotFoundError } from '../src/lib.js' import { decodeDelegation } from '@web3-storage/content-claims/client' +import { sha256 } from 'multiformats/hashes/sha2' +import * as Link from 'multiformats/link' +const carCode = 0x0202 /** * @import { Suite, Result, Assert as AssertObj } from 'entail' * @import { Await } from '@ipld/dag-ucan' @@ -83,7 +86,6 @@ const digestString = 'zQmRm3SMS4EbiKYy7VeV3zqXqzyr76mq9b2zg3Tij3VhKUG' const digest = Digest.decode(base58btc.decode(digestString)) const fixturePath = path.join( import.meta.dirname, - '..', 'fixtures', `${digestString}.queryresult.car` ) @@ -267,6 +269,176 @@ export const testIndexingServiceLocator = { } }) }, + 'should fetch location claims for index claims if they are not present': async (assert) => { + const content1Link = createTestCID('content1') + // content2Link is in the same shard, but a different slice + const content2Link = createTestCID('content2') + // content3Link is in a different shard, but the same index + const content3Link = createTestCID('content3') + + const index1 = ShardedDAGIndex.create(content1Link) + const shard1Link = createTestCID('shard1') + const shard2Link = createTestCID('shard2') + index1.setSlice(shard1Link.multihash, content1Link.multihash, [110, 120]) + index1.setSlice(shard1Link.multihash, content2Link.multihash, [210, 220]) + index1.setSlice(shard2Link.multihash, content3Link.multihash, [310, 320]) + + const archive = await index1.archive() + if (!archive.ok) { + assert.fail('unable to create archive') + return + } + const indexDigest = await sha256.digest(archive.ok) + const indexLink = Link.create(carCode, indexDigest) + const indexingService = await ed25519.Signer.generate() + const firstQueryResultContents = { + claims: [ + await decodeDelegation(await Assert.index + .invoke({ + issuer: indexingService, + audience: indexingService, + with: indexingService.did(), + nb: { + content: { digest: content1Link.multihash.bytes }, + index: indexLink + } + }) + .delegate()) + ], + indexes: new Map() + } + + const indexQueryResultContents = { + claims: [ + await decodeDelegation(await Assert.location + .invoke({ + issuer: indexingService, + audience: indexingService, + with: indexingService.did(), + nb: { + content: { digest: indexLink.multihash.bytes }, + location: ['http://example.com/index/replica1'] + } + }) + .delegate()) + ], + indexes: new Map([ + ['the context id', index1] + ]) + } + + const firstShardLocationQuery = { + claims: [ + await decodeDelegation(await Assert.location + .invoke({ + issuer: indexingService, + audience: indexingService, + with: indexingService.did(), + nb: { + content: { digest: shard1Link.multihash.bytes }, + location: ['http://example.com/shard1/replica1', + 'http://example.com/shard1/replica2'] + } + }) + .delegate()) + ], + indexes: new Map() + } + + const secondShardLocationQuery = { + claims: [ + await decodeDelegation(await Assert.location + .invoke({ + issuer: indexingService, + audience: indexingService, + with: indexingService.did(), + nb: { + content: { digest: shard2Link.multihash.bytes }, + location: ['http://example.com/shard2/replica1'] + } + }) + .delegate()) + ], + indexes: new Map() + } + + const locator = new IndexingServiceLocator({ + client: new Client({ + fetch: stubFetch({ + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + content1Link.multihash.bytes + )}&kind=standard`]: () => archivedQueryResultFrom(firstQueryResultContents, assert), + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + indexLink.multihash.bytes + )}&kind=location`]: () => archivedQueryResultFrom(indexQueryResultContents, assert), + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + shard1Link.multihash.bytes + )}&kind=location`]: () => archivedQueryResultFrom(firstShardLocationQuery, assert), + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + shard2Link.multihash.bytes + )}&kind=location`]: () => archivedQueryResultFrom(secondShardLocationQuery, assert), + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + content2Link.multihash.bytes + )}&kind=standard`]: () => { + assert.fail(new Error('Should not have requested content2')) + return null + }, + [`https://indexing.storacha.network/claims?multihash=${base58btc.encode( + content3Link.multihash.bytes + )}&kind=standard`]: () => { + assert.fail(new Error('Should not have requested content3')) + return null + } + }) + }) + }) + + assert.deepEqual(await locator.locate(content1Link.multihash), { + ok: { + digest: content1Link.multihash, + site: [ + { + location: [ + new URL('http://example.com/shard1/replica1'), + new URL('http://example.com/shard1/replica2') + ], + range: { offset: 110, length: 120 }, + space: undefined + } + ] + } + }) + + assert.deepEqual(await locator.locate(content2Link.multihash), { + ok: { + digest: content2Link.multihash, + site: [ + { + location: [ + new URL('http://example.com/shard1/replica1'), + new URL('http://example.com/shard1/replica2') + ], + range: { offset: 210, length: 220 }, + space: undefined + } + ] + } + }) + + assert.deepEqual(await locator.locate(content3Link.multihash), { + ok: { + digest: content3Link.multihash, + site: [ + { + location: [new URL('http://example.com/shard2/replica1')], + range: { offset: 310, length: 320 }, + space: undefined + } + ] + } + }) + }, + 'will fetch from a seperate shard in an index on a subsequent call': async (assert) => { const content1Link = createTestCID('content1') // content2Link is in the same shard, but a different slice