Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation. Closes #1 #2

Merged
merged 56 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7d7f52f
docs
juliangruber May 24, 2023
efbe295
add get job from orchestrator
juliangruber May 24, 2023
9bdc77c
style
juliangruber May 24, 2023
6e91bb4
add fetch car via saturn gateway
juliangruber May 24, 2023
363c56d
add submit success to ingestor
juliangruber May 24, 2023
9069c91
fix submit retrieval
juliangruber May 24, 2023
83db140
log
juliangruber May 24, 2023
327ad17
refactor
juliangruber May 24, 2023
2662dfa
add loop
juliangruber May 24, 2023
911f11d
more stable loop
juliangruber May 24, 2023
d95b315
call `Zinnia.jobCompleted()`
juliangruber May 24, 2023
1c5b98d
collect ttfb
juliangruber May 24, 2023
378d42d
add response status
juliangruber May 24, 2023
fdc6458
abort early if res not ok
juliangruber May 24, 2023
a577f9a
refactor
juliangruber May 24, 2023
0cebf09
add read timeout
juliangruber May 24, 2023
4310754
report error activity
juliangruber May 24, 2023
2d5c611
clean up
juliangruber May 24, 2023
6206c73
refactor
juliangruber May 24, 2023
cf6dfb3
refactor
juliangruber May 24, 2023
cabc58d
start test suite
juliangruber May 24, 2023
02ed3ee
add ci
juliangruber May 24, 2023
79f9935
refactor
juliangruber May 24, 2023
00885d0
test
juliangruber May 29, 2023
71ffb22
add retries to integration test
juliangruber May 29, 2023
08c5568
fix ci
juliangruber May 29, 2023
2a4cd08
add unit test
juliangruber May 29, 2023
6dc0cc9
simplify, add unit test
juliangruber May 29, 2023
826d648
add unit test
juliangruber May 29, 2023
2c41f31
todo
juliangruber May 29, 2023
9cf1e65
todo
juliangruber May 29, 2023
ab157fc
more tests
juliangruber May 29, 2023
da0fd27
add submit more fields
juliangruber May 29, 2023
353c835
fmt
juliangruber May 29, 2023
a140c93
module.js -> main.js
juliangruber May 30, 2023
ee4ba4f
ci: add lint
juliangruber May 30, 2023
800e5b7
docs
juliangruber May 30, 2023
acb147e
add receive headers timeout
juliangruber May 30, 2023
85d2135
add `ActivityState`
juliangruber May 30, 2023
a9427e3
move `test/all.js` to `test.js`
juliangruber May 30, 2023
838e2a2
add missing throw in integration test
juliangruber May 30, 2023
ce7b5c6
refactor using async iterator syntax
juliangruber May 30, 2023
2dc0c03
fix lint
juliangruber May 31, 2023
62f8cce
allow more attempts for integration tests
juliangruber May 31, 2023
633798c
add test order, arguments and length of `fetch` calls
juliangruber May 31, 2023
dfc6707
fix submitRetrieval + test
juliangruber May 31, 2023
27e7832
fix lint
juliangruber May 31, 2023
7e0ca9f
fix tests and condition
juliangruber May 31, 2023
e97820e
update `spark-api` schema
juliangruber May 31, 2023
c157dc0
add check retrieval from api
juliangruber May 31, 2023
0cb2abe
include status code in error message
juliangruber May 31, 2023
5d514b1
`startTimeout` -> `resetTimeout`
juliangruber May 31, 2023
b04e901
make `stats` also accessible if retrieval fails
juliangruber May 31, 2023
89e3d05
use activity class
juliangruber May 31, 2023
33775e5
add assert res.ok
juliangruber May 31, 2023
5167c71
fix lint
juliangruber May 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: curl -L https://github.com/filecoin-station/zinnia/releases/download/v0.10.0/zinnia-linux-x64.tar.gz | tar -xz
- uses: actions/setup-node@v3
- run: npx standard
- run: ./zinnia run test.js
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,16 @@ SP Retrieval Checker Module

- [Roadmap](https://pl-strflt.notion.site/SPARK-Roadmap-ac729c11c49b409fbec54751d1bc6c8a)
- [API](https://github.com/filecoin-station/spark-api)

## Development

Install [Zinnia CLI](https://github.com/filecoin-station/zinnia).

```bash
$ # Lint
$ npx standard
$ # Run module
$ zinnia run main.js
$ # Test module
$ zinnia run test.js
```
134 changes: 134 additions & 0 deletions lib/spark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/* global Zinnia */

import { assertEquals } from 'zinnia:assert'

const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))

// Create activity events when we bacome operational or fail, but only once
export class ActivityState {
#ok = true

onError () {
if (this.#ok) {
this.#ok = false
Zinnia.activity.error('SPARK failed reporting retrieval')
}
}

onSuccess () {
if (!this.#ok) {
this.#ok = true
Zinnia.activity.success('SPARK retrieval reporting resumed')
}
}
}

export default class Spark {
#fetch
#activity = new ActivityState()

constructor ({ fetch = globalThis.fetch } = {}) {
this.#fetch = fetch
}

async getRetrieval () {
console.log('Geting retrieval...')
const res = await this.#fetch('https://spark.fly.dev/retrievals', {
method: 'POST'
})
const retrieval = await res.json()
console.log({ retrieval })
return retrieval
}

async fetchCAR (url) {
// TODO: Ensure these fields are persisted by `spark-api`
bajtos marked this conversation as resolved.
Show resolved Hide resolved
const stats = {
start: new Date(),
firstByte: null,
end: null,
byteLength: 0,
status: null
}
console.log('Fetching CAR...')

// Abort if no progress was made for 10 seconds
const controller = new AbortController()
const { signal } = controller
let timeout
const startTimeout = () => {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(() => controller.abort(), 10_000)
}

try {
startTimeout()
const res = await this.#fetch(url, { signal })
stats.status = res.status

if (res.ok) {
startTimeout()
for await (const value of res.body) {
if (stats.firstByte === null) {
stats.firstByte = new Date()
}
stats.byteLength += value.byteLength
startTimeout()
}
}
} finally {
clearTimeout(timeout)
}

stats.end = new Date()
console.log(stats)
return stats
}

async submitRetrieval (id, stats) {
console.log('Submitting retrieval...')
const res = await this.#fetch(`https://spark.fly.dev/retrievals/${id}`, {
method: 'PATCH',
body: JSON.stringify(stats),
headers: {
'Content-Type': 'application/json'
}
})
assertEquals(res.status, 200)
console.log('Retrieval submitted')
}

async nextRetrieval () {
const retrieval = await this.getRetrieval()

let success = false
let stats
const url = `https://strn.pl/ipfs/${retrieval.cid}`
try {
stats = await this.fetchCAR(url)
success = true
this.#activity.onSuccess()
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
this.#activity.onError()
}

await this.submitRetrieval(retrieval.id, { success, ...stats })
bajtos marked this conversation as resolved.
Show resolved Hide resolved
Zinnia.jobCompleted()
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
}

async run () {
while (true) {
try {
await this.nextRetrieval()
} catch (err) {
Zinnia.activity.error('SPARK failed reporting retrieval')
bajtos marked this conversation as resolved.
Show resolved Hide resolved
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
console.error(err)
}
await sleep(1_000)
}
}
}
4 changes: 4 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import Spark from './lib/spark.js'

const spark = new Spark()
await spark.run()
2 changes: 2 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import './test/integration.js'
import './test/spark.js'
15 changes: 15 additions & 0 deletions test/integration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'

test('integration', async () => {
const spark = new Spark()
for (let i = 0; i < 100; i++) {
try {
await spark.nextRetrieval()
return
} catch (err) {
console.error(err)
}
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}
throw new Error('No retrieval succeeded')
})
70 changes: 70 additions & 0 deletions test/spark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'
import { assertInstanceOf, assertEquals } from 'zinnia:assert'

test('getRetrieval', async () => {
const retrieval = { retrieval: 'retrieval' }
const requests = []
const fetch = async (url, opts) => {
requests.push({ url, opts })
return {
async json () {
return retrieval
}
}
}
const spark = new Spark({ fetch })
assertEquals(await spark.getRetrieval(), retrieval)
assertEquals(requests, [{
url: 'https://spark.fly.dev/retrievals',
opts: { method: 'POST' }
}])
})

// TODO: test more cases
test('fetchCAR', async () => {
const URL = 'url'
const requests = []
const fetch = async url => {
requests.push({ url })
return {
status: 200,
ok: true,
body: (async function * () {
yield new Uint8Array([1, 2, 3])
})()
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}
}
const spark = new Spark({ fetch })
const stats = await spark.fetchCAR(URL)
assertInstanceOf(stats.start, Date)
assertInstanceOf(stats.firstByte, Date)
assertInstanceOf(stats.end, Date)
assertEquals(stats.byteLength, 3)
assertEquals(stats.status, 200)
assertEquals(requests, [{ url: URL }])
})

test('submitRetrieval', async () => {
const requests = []
const fetch = async (url, opts) => {
requests.push({ url, opts })
assertEquals(url, 'https://spark.fly.dev/retrievals/0')
assertEquals(opts.method, 'PATCH')
assertEquals(JSON.parse(opts.body), { success: true })
assertEquals(opts.headers['Content-Type'], 'application/json')
return { status: 200 }
}
const spark = new Spark({ fetch })
await spark.submitRetrieval(0, { success: true })
bajtos marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(requests, [
{
url: 'https://spark.fly.dev/retrievals/0',
opts: {
method: 'PATCH',
body: JSON.stringify({ success: true }),
headers: { 'Content-Type': 'application/json' }
}
}
])
})