Skip to content

Commit

Permalink
feat: limit max download size + calculate SHA-256 checksum (#28)
Browse files Browse the repository at this point in the history
* feat: limit download size to 200 MB per task

* feat: compute SHA-256 checksum of CAR payload

* Change success definition (#33)

- success -> healthy
- remove `success`
- update test

---------

Signed-off-by: Miroslav Bajtoš <[email protected]>
Co-authored-by: Julian Gruber <[email protected]>
  • Loading branch information
bajtos and juliangruber authored Oct 10, 2023
1 parent fb07cf9 commit b38ac0b
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 17 deletions.
18 changes: 9 additions & 9 deletions lib/activity-state.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
/* global Zinnia */

// Create activity events when we go online or offline
// Create activity events when we become healthy or produce errors
export class ActivityState {
#ok = null
#healthy = null

onOutdatedClient () {
this.onError('SPARK is outdated. Please upgrade Filecoin Station to the latest version.')
}

onError (msg) {
if (this.#ok === null || this.#ok) {
this.#ok = false
if (this.#healthy === null || this.#healthy) {
this.#healthy = false
Zinnia.activity.error(msg ?? 'SPARK failed reporting retrieval')
}
}

onSuccess () {
if (this.#ok === null) {
this.#ok = true
onHealthy () {
if (this.#healthy === null) {
this.#healthy = true
Zinnia.activity.info('SPARK started reporting retrievals')
} else if (!this.#ok) {
this.#ok = true
} else if (!this.#healthy) {
this.#healthy = true
Zinnia.activity.info('SPARK retrieval reporting resumed')
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const SPARK_VERSION = '1.4.0'
export const DELAY_BETWEEN_RETRIEVALS = 10_000
export const MAX_CAR_SIZE = 200 * 1024 * 1024 // 200 MB
93 changes: 93 additions & 0 deletions lib/deno-encoding-hex.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// This code was bundled using `deno bundle` and it's not recommended to edit it manually
//
// You can re-create this file by running the following command:
// deno bundle "https://deno.land/[email protected]/encoding/hex.ts" > lib/deno-encoding-hex.js

const encoder = new TextEncoder()
function getTypeName (value) {
const type = typeof value
if (type !== 'object') {
return type
} else if (value === null) {
return 'null'
} else {
return value?.constructor?.name ?? 'object'
}
}
function validateBinaryLike (source) {
if (typeof source === 'string') {
return encoder.encode(source)
} else if (source instanceof Uint8Array) {
return source
} else if (source instanceof ArrayBuffer) {
return new Uint8Array(source)
}
throw new TypeError(`The input must be a Uint8Array, a string, or an ArrayBuffer. Received a value of the type ${getTypeName(source)}.`)
}
const hexTable = new TextEncoder().encode('0123456789abcdef')
const textEncoder = new TextEncoder()
const textDecoder = new TextDecoder()
function errInvalidByte (__byte) {
return new TypeError(`Invalid byte '${String.fromCharCode(__byte)}'`)
}
function errLength () {
return new RangeError('Odd length hex string')
}
function fromHexChar (__byte) {
if (__byte >= 48 && __byte <= 57) return __byte - 48
if (__byte >= 97 && __byte <= 102) return __byte - 97 + 10
if (__byte >= 65 && __byte <= 70) return __byte - 65 + 10
throw errInvalidByte(__byte)
}
function encode (src) {
const dst = new Uint8Array(src.length * 2)
for (let i = 0; i < dst.length; i++) {
const v = src[i]
dst[i * 2] = hexTable[v >> 4]
dst[i * 2 + 1] = hexTable[v & 0x0f]
}
return dst
}
function encodeHex (src) {
const u8 = validateBinaryLike(src)
const dst = new Uint8Array(u8.length * 2)
for (let i = 0; i < dst.length; i++) {
const v = u8[i]
dst[i * 2] = hexTable[v >> 4]
dst[i * 2 + 1] = hexTable[v & 0x0f]
}
return textDecoder.decode(dst)
}
function decode (src) {
const dst = new Uint8Array(src.length / 2)
for (let i = 0; i < dst.length; i++) {
const a = fromHexChar(src[i * 2])
const b = fromHexChar(src[i * 2 + 1])
dst[i] = a << 4 | b
}
if (src.length % 2 === 1) {
fromHexChar(src[dst.length * 2])
throw errLength()
}
return dst
}
function decodeHex (src) {
const u8 = textEncoder.encode(src)
const dst = new Uint8Array(u8.length / 2)
for (let i = 0; i < dst.length; i++) {
const a = fromHexChar(u8[i * 2])
const b = fromHexChar(u8[i * 2 + 1])
dst[i] = a << 4 | b
}
if (u8.length % 2 === 1) {
fromHexChar(u8[dst.length * 2])
throw errLength()
}
return dst
}
export { encode }
export { encodeHex }
export { decode }
export { decodeHex }
38 changes: 33 additions & 5 deletions lib/spark.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* global Zinnia */

import { ActivityState } from './activity-state.js'
import { SPARK_VERSION, DELAY_BETWEEN_RETRIEVALS } from './constants.js'
import { SPARK_VERSION, DELAY_BETWEEN_RETRIEVALS, MAX_CAR_SIZE } from './constants.js'
import { encodeHex } from './deno-encoding-hex.js'

const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))

Expand Down Expand Up @@ -46,6 +47,11 @@ export default class Spark {
}, 60_000)
}

// WebCrypto API does not support streams yet, the hashing function requires entire data
// to be provided at once. See https://github.com/w3c/webcrypto/issues/73
const carBuffer = new ArrayBuffer(0, { maxByteLength: MAX_CAR_SIZE })
const carBytes = new Uint8Array(carBuffer)

try {
resetTimeout()
const res = await this.#fetch(url, { signal })
Expand All @@ -58,8 +64,30 @@ export default class Spark {
stats.firstByteAt = new Date()
}
stats.byteLength += value.byteLength

// We want to limit how large content we are willing to download.
// 1. To make sure we don't spend too much time (and network bandwidth) on a single task,
// so that we can complete more tasks per round
// 2. Until we have streaming hashes, we need to keep the entire payload in memory, and so
// we need to put an upper limit on how much memory we consume.
if (stats.byteLength > MAX_CAR_SIZE) {
stats.carTooLarge = true
break
}

const offset = carBuffer.byteLength
carBuffer.resize(offset + value.byteLength)
carBytes.set(value, offset)

resetTimeout()
}

if (!stats.carTooLarge) {
const digest = await crypto.subtle.digest('sha-256', carBytes)
// 12 is the code for sha2-256
// 20 is the digest length (32 bytes = 256 bits)
stats.carChecksum = '1220' + encodeHex(digest)
}
} else {
console.error('Retrieval failed with status code %s: %s',
res.status, await res.text())
Expand Down Expand Up @@ -98,13 +126,14 @@ export default class Spark {
async nextRetrieval () {
const { id: retrievalId, ...retrieval } = await this.getRetrieval()

let success = false
const stats = {
timeout: false,
startAt: new Date(),
firstByteAt: null,
endAt: null,
carTooLarge: false,
byteLength: 0,
carChecksum: null,
statusCode: null
}
const searchParams = new URLSearchParams({
Expand All @@ -114,13 +143,12 @@ export default class Spark {
const url = `ipfs://${retrieval.cid}?${searchParams.toString()}`
try {
await this.fetchCAR(url, stats)
success = true
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
}

const measurementId = await this.submitMeasurement(retrieval, { success, ...stats })
const measurementId = await this.submitMeasurement(retrieval, { ...stats })
Zinnia.jobCompleted()
return measurementId
}
Expand All @@ -129,7 +157,7 @@ export default class Spark {
while (true) {
try {
await this.nextRetrieval()
this.#activity.onSuccess()
this.#activity.onHealthy()
} catch (err) {
if (err.statusCode === 400 && err.serverMessage === 'OUTDATED CLIENT') {
this.#activity.onOutdatedClient()
Expand Down
38 changes: 35 additions & 3 deletions test/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'
import { assertInstanceOf, assertEquals, assertArrayIncludes } from 'zinnia:assert'
import { SPARK_VERSION } from '../lib/constants.js'
import { SPARK_VERSION, MAX_CAR_SIZE } from '../lib/constants.js'

test('getRetrieval', async () => {
const round = {
Expand Down Expand Up @@ -64,27 +64,60 @@ test('fetchCAR', async () => {
startAt: new Date(),
firstByteAt: null,
endAt: null,
carTooLarge: false,
byteLength: 0,
carChecksum: null,
statusCode: null
}
await spark.fetchCAR(URL, stats)
assertEquals(stats.timeout, false)
assertInstanceOf(stats.startAt, Date)
assertInstanceOf(stats.firstByteAt, Date)
assertInstanceOf(stats.endAt, Date)
assertEquals(stats.carTooLarge, false)
assertEquals(stats.byteLength, 3)
assertEquals(stats.carChecksum, '1220039058c6f2c0cb492c533b0a4d14ef77cc0f78abccced5287d84a1a2011cfb81')
assertEquals(stats.statusCode, 200)
assertEquals(requests, [{ url: URL }])
})

test('fetchCAR exceeding MAX_CAR_SIZE', async () => {
const URL = 'url'
const fetch = async url => {
return {
status: 200,
ok: true,
body: (async function * () {
const data = new Uint8Array(MAX_CAR_SIZE + 1)
data.fill(11, 0, -1)
yield data
})()
}
}
const spark = new Spark({ fetch })
const stats = {
timeout: false,
carTooLarge: false,
byteLength: 0,
carChecksum: null,
statusCode: null
}
await spark.fetchCAR(URL, stats)
assertEquals(stats.timeout, false)
assertEquals(stats.carTooLarge, true)
assertEquals(stats.byteLength, MAX_CAR_SIZE + 1)
assertEquals(stats.carChecksum, null)
assertEquals(stats.statusCode, 200)
})

test('submitRetrieval', async () => {
const requests = []
const fetch = async (url, opts) => {
requests.push({ url, opts })
return { status: 200, ok: true, async json () { return { id: 123 } } }
}
const spark = new Spark({ fetch })
await spark.submitMeasurement({ cid: 'bafytest' }, { success: true })
await spark.submitMeasurement({ cid: 'bafytest' }, {})
assertEquals(requests, [
{
url: 'https://spark.fly.dev/measurements',
Expand All @@ -94,7 +127,6 @@ test('submitRetrieval', async () => {
sparkVersion: SPARK_VERSION,
zinniaVersion: Zinnia.versions.zinnia,
cid: 'bafytest',
success: true,
participantAddress: Zinnia.walletAddress
}),
headers: { 'Content-Type': 'application/json' }
Expand Down

0 comments on commit b38ac0b

Please sign in to comment.