Skip to content

Commit

Permalink
Add implementation. Closes #1 (#2)
Browse files Browse the repository at this point in the history
* docs

* add get job from orchestrator

* style

* add fetch car via saturn gateway

* add submit success to ingestor

* fix submit retrieval

* log

* refactor

* add loop

* more stable loop

* call `Zinnia.jobCompleted()`

* collect ttfb

* add response status

* abort early if res not ok

* refactor

* add read timeout

* report error activity

* clean up

* refactor

* refactor

* start test suite

* add ci

* refactor

* test

* add retries to integration test

* fix ci

* add unit test

* simplify, add unit test

* add unit test

* todo

* todo

* more tests

* add submit more fields

* fmt

* module.js -> main.js

* ci: add lint

* docs

* add receive headers timeout

* add `ActivityState`

* move `test/all.js` to `test.js`

* add missing throw in integration test

* refactor using async iterator syntax

* fix lint

* allow more attempts for integration tests

* add test order, arguments and length of `fetch` calls

* fix submitRetrieval + test

* fix lint

* fix tests and condition

* update `spark-api` schema

* add check retrieval from api

* include status code in error message

* `startTimeout` -> `resetTimeout`

* make `stats` also accessible if retrieval fails

* use activity class

* add assert res.ok

* fix lint
  • Loading branch information
juliangruber authored May 31, 2023
1 parent 48d9d59 commit 7b4444c
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: curl -L https://github.com/filecoin-station/zinnia/releases/download/v0.10.0/zinnia-linux-x64.tar.gz | tar -xz
- uses: actions/setup-node@v3
- run: npx standard
- run: ./zinnia run test.js
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,16 @@ SP Retrieval Checker Module

- [Roadmap](https://pl-strflt.notion.site/SPARK-Roadmap-ac729c11c49b409fbec54751d1bc6c8a)
- [API](https://github.com/filecoin-station/spark-api)

## Development

Install [Zinnia CLI](https://github.com/filecoin-station/zinnia).

```bash
$ # Lint
$ npx standard
$ # Run module
$ zinnia run main.js
$ # Test module
$ zinnia run test.js
```
139 changes: 139 additions & 0 deletions lib/spark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* global Zinnia */

const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))

// Create activity events when we bacome operational or fail, but only once
export class ActivityState {
#ok = true

onError () {
if (this.#ok) {
this.#ok = false
Zinnia.activity.error('SPARK failed reporting retrieval')
}
}

onSuccess () {
if (!this.#ok) {
this.#ok = true
Zinnia.activity.success('SPARK retrieval reporting resumed')
}
}
}

export default class Spark {
#fetch
#activity = new ActivityState()

constructor ({ fetch = globalThis.fetch } = {}) {
this.#fetch = fetch
}

async getRetrieval () {
console.log('Geting retrieval...')
const res = await this.#fetch('https://spark.fly.dev/retrievals', {
method: 'POST'
})
const retrieval = await res.json()
console.log({ retrieval })
return retrieval
}

async fetchCAR (url, stats) {
console.log('Fetching CAR...')

// Abort if no progress was made for 10 seconds
const controller = new AbortController()
const { signal } = controller
let timeout
const resetTimeout = () => {
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(() => controller.abort(), 10_000)
}

try {
resetTimeout()
const res = await this.#fetch(url, { signal })
stats.statusCode = res.status

if (res.ok) {
resetTimeout()
for await (const value of res.body) {
if (stats.firstByteAt === null) {
stats.firstByteAt = new Date()
}
stats.byteLength += value.byteLength
resetTimeout()
}
}
} finally {
clearTimeout(timeout)
}

stats.endAt = new Date()
console.log(stats)
}

async submitRetrieval (id, stats) {
console.log('Submitting retrieval...')
const res = await this.#fetch(`https://spark.fly.dev/retrievals/${id}`, {
method: 'PATCH',
body: JSON.stringify({
...stats,
walletAddress: Zinnia.walletAddress
}),
headers: {
'Content-Type': 'application/json'
}
})
if (res.status !== 200) {
let body
try {
body = await res.text()
} catch {}
throw new Error(`Failed to submit retrieval (${res.status}): ${body}`)
}
console.log('Retrieval submitted')
}

async nextRetrieval () {
const retrieval = await this.getRetrieval()

let success = false
const stats = {
startAt: new Date(),
firstByteAt: null,
endAt: null,
byteLength: 0,
statusCode: null
}
const url = `https://strn.pl/ipfs/${retrieval.cid}`
try {
await this.fetchCAR(url, stats)
success = true
this.#activity.onSuccess()
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
this.#activity.onError()
}

await this.submitRetrieval(retrieval.id, { success, ...stats })
Zinnia.jobCompleted()
return retrieval.id
}

async run () {
while (true) {
try {
await this.nextRetrieval()
} catch (err) {
this.#activity.onError()
console.error(err)
}
await sleep(1_000)
}
}
}
4 changes: 4 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import Spark from './lib/spark.js'

const spark = new Spark()
await spark.run()
2 changes: 2 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import './test/integration.js'
import './test/spark.js'
13 changes: 13 additions & 0 deletions test/integration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'
import { assert } from 'zinnia:assert'

test('integration', async () => {
const spark = new Spark()
const id = await spark.nextRetrieval()
const res = await fetch(`https://spark.fly.dev/retrievals/${id}`)
assert(res.ok)
const retrieval = await res.json()
assert(retrieval.startAt)
assert(retrieval.finishedAt)
})
78 changes: 78 additions & 0 deletions test/spark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* global Zinnia */

import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'
import { assertInstanceOf, assertEquals } from 'zinnia:assert'

test('getRetrieval', async () => {
const retrieval = { retrieval: 'retrieval' }
const requests = []
const fetch = async (url, opts) => {
requests.push({ url, opts })
return {
async json () {
return retrieval
}
}
}
const spark = new Spark({ fetch })
assertEquals(await spark.getRetrieval(), retrieval)
assertEquals(requests, [{
url: 'https://spark.fly.dev/retrievals',
opts: { method: 'POST' }
}])
})

// TODO: test more cases
test('fetchCAR', async () => {
const URL = 'url'
const requests = []
const fetch = async url => {
requests.push({ url })
return {
status: 200,
ok: true,
body: (async function * () {
yield new Uint8Array([1, 2, 3])
})()
}
}
const spark = new Spark({ fetch })
const stats = {
startAt: new Date(),
firstByteAt: null,
endAt: null,
byteLength: 0,
statusCode: null
}
await spark.fetchCAR(URL, stats)
assertInstanceOf(stats.startAt, Date)
assertInstanceOf(stats.firstByteAt, Date)
assertInstanceOf(stats.endAt, Date)
assertEquals(stats.byteLength, 3)
assertEquals(stats.statusCode, 200)
assertEquals(requests, [{ url: URL }])
})

test('submitRetrieval', async () => {
const requests = []
const fetch = async (url, opts) => {
requests.push({ url, opts })
return { status: 200 }
}
const spark = new Spark({ fetch })
await spark.submitRetrieval(0, { success: true })
assertEquals(requests, [
{
url: 'https://spark.fly.dev/retrievals/0',
opts: {
method: 'PATCH',
body: JSON.stringify({
success: true,
walletAddress: Zinnia.walletAddress
}),
headers: { 'Content-Type': 'application/json' }
}
}
])
})

0 comments on commit 7b4444c

Please sign in to comment.