Skip to content

Commit

Permalink
feat: send core "haves" bitfield on first connect (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan authored Sep 13, 2023
1 parent 8be90d2 commit 4042a8f
Show file tree
Hide file tree
Showing 17 changed files with 1,606 additions and 55 deletions.
323 changes: 294 additions & 29 deletions package-lock.json

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"lint": "eslint .",
"format": "prettier . --write",
"test": "npm-run-all lint build:types type test:unit test:e2e test:types",
"test:unit": "brittle \"tests/**/*.js\"",
"test:unit": "brittle \"tests/**/*.js\" \"vendor/**/*.test.js\"",
"test:e2e": "brittle \"test-e2e/**/*.js\"",
"test:types": "tsc -p test-types/tsconfig.json",
"build:types": "tsc -p tsconfig.npm.json && cpy 'src/**/*.d.ts' dist",
Expand All @@ -20,7 +20,8 @@
"protobuf": "node ./scripts/build-messages.js",
"db:generate:project": "drizzle-kit generate:sqlite --schema src/schema/project.js --out drizzle/project",
"db:generate:client": "drizzle-kit generate:sqlite --schema src/schema/client.js --out drizzle/client",
"prepack": "npm run build:types"
"prepack": "npm run build:types",
"postinstall": "patch-package"
},
"files": [
"src",
Expand Down Expand Up @@ -66,6 +67,8 @@
"@types/node": "^18.16.3",
"@types/sinonjs__fake-timers": "^8.1.2",
"@types/streamx": "^2.9.1",
"@types/varint": "^6.0.1",
"bitfield": "^4.1.0",
"brittle": "^3.2.1",
"cpy": "^10.1.0",
"cpy-cli": "^5.0.0",
Expand Down Expand Up @@ -102,6 +105,7 @@
"b4a": "^1.6.3",
"base32.js": "^0.1.0",
"better-sqlite3": "^8.3.0",
"big-sparse-array": "^1.0.3",
"compact-encoding": "^2.12.0",
"corestore": "^6.8.4",
"drizzle-orm": "0.28.2",
Expand All @@ -115,11 +119,14 @@
"multi-core-indexer": "^1.0.0-alpha.7",
"multicast-service-discovery": "^4.0.4",
"p-defer": "^4.0.0",
"patch-package": "^8.0.0",
"private-ip": "^3.0.0",
"protobufjs": "^7.2.3",
"protomux": "^3.4.1",
"quickbit-universal": "^2.2.0",
"sodium-universal": "^4.0.0",
"sub-encoder": "^2.1.1",
"tiny-typed-emitter": "^2.1.0"
"tiny-typed-emitter": "^2.1.0",
"varint": "^6.0.0"
}
}
54 changes: 54 additions & 0 deletions patches/@digidem+types+2.0.0.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
diff --git a/node_modules/@digidem/types/vendor/hypercore/index.d.ts b/node_modules/@digidem/types/vendor/hypercore/index.d.ts
index 911e4ef..438b072 100644
--- a/node_modules/@digidem/types/vendor/hypercore/index.d.ts
+++ b/node_modules/@digidem/types/vendor/hypercore/index.d.ts
@@ -8,11 +8,31 @@ interface RemoteBitfield {
get(index: number): boolean
}

-interface HypercoreExtension {
+interface CompactEncodingState {
+ start: number
+ end: number
+ buffer: Buffer | Uint8Array
+ cache: any
+}
+
+interface CompactEncoding<T> {
+ preencode(state: CompactEncodingState, val: T): void
+ encode(state: CompactEncodingState, val: T): void
+ decode(state: CompactEncodingState): T
+}
+
+interface CodecEncoding<T> {
+ encode(val: T): Buffer | Uint8Array
+ decode(buf: Buffer | Uint8Array): T
+}
+
+type Encoding<T> = CompactEncoding<T> | CodecEncoding<T>
+
+interface HypercoreExtension<T> {
name: string
- encoding: any
- send(data: Buffer | Uint8Array, peer: any): void
- broadcast(data: Buffer | Uint8Array): void
+ encoding: Encoding<T>
+ send(message: T, peer: any): void
+ broadcast(message: T): void
destroy(): void
}

@@ -185,10 +205,10 @@ declare class Hypercore<
session(options?: Hypercore.HypercoreOptions<TValueEncoding>): Hypercore
close(): Promise<void>
ready(): Promise<void>
- registerExtension(
+ registerExtension<T = Buffer | Uint8Array>(
name: string,
- handlers?: { encoding?: any; onmessage?: (buf: Buffer, peer: any) => void }
- ): HypercoreExtension
+ handlers?: { encoding?: Encoding<T>; onmessage?: (message: T, peer: any) => void }
+ ): HypercoreExtension<T>
replicate(
isInitiatorOrReplicationStream: boolean | Duplex,
opts?: { keepAlive?: boolean }
6 changes: 6 additions & 0 deletions proto/extensions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ message ProjectExtension {
repeated bytes authCoreKeys = 1;
repeated bytes wantCoreKeys = 2;
}

message HaveExtension {
bytes discoveryKey = 1;
uint64 start = 2;
bytes encodedBitfield = 3;
}
236 changes: 236 additions & 0 deletions src/core-manager/bitfield-rle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// @ts-check
// https://github.com/mafintosh/bitfield-rle/blob/31a0001/index.js
// Vendored so that we can run cross-platform tests with latest Node versions
// Modified to encode and decode Uint32Arrays

import varint from 'varint'

const isLittleEndian =
new Uint8Array(new Uint16Array([0xff]).buffer)[0] === 0xff
const isBigEndian = !isLittleEndian

// align to 4 bytes for Uint32Array output
const n = 4

class State {
/**
*
* @param {Buffer} input
* @param {Buffer | undefined} output
* @param {number} offset
*/
constructor(input, output, offset) {
this.inputOffset = 0
this.inputLength = input.length
this.input = input
this.outputOffset = offset
this.output = output
}
}

encode.bytes = 0

/**
* @param {Uint32Array} bitfield
* @param {Buffer} [buffer]
* @param {number} [offset]
*/
export function encode(bitfield, buffer, offset) {
if (!offset) offset = 0

const bitfieldBuf = Buffer.from(
bitfield.buffer,
bitfield.byteOffset,
bitfield.byteLength
)

// Encoded as little endian
if (isBigEndian) bitfieldBuf.swap32()

if (!buffer) buffer = Buffer.allocUnsafe(encodingLength(bitfieldBuf))
var state = new State(bitfieldBuf, buffer, offset)
rle(state)
encode.bytes = state.outputOffset - offset
return buffer
}

/**
* @param {Buffer} bitfield
*/
export function encodingLength(bitfield) {
var state = new State(bitfield, undefined, 0)
rle(state)
return state.outputOffset
}

decode.bytes = 0
/**
* @param {Buffer} buffer
* @param {number} [offset]
* @returns {Uint32Array}
*/
export function decode(buffer, offset) {
if (!offset) offset = 0

var bitfieldBuf = Buffer.allocUnsafe(decodingLength(buffer, offset))
var ptr = 0

while (offset < buffer.length) {
var next = varint.decode(buffer, offset)
var repeat = next & 1
var len = repeat ? (next - (next & 3)) / 4 : next / 2

offset += varint.decode.bytes || 0

if (repeat) {
bitfieldBuf.fill(next & 2 ? 255 : 0, ptr, ptr + len)
} else {
buffer.copy(bitfieldBuf, ptr, offset, offset + len)
offset += len
}

ptr += len
}

bitfieldBuf.fill(0, ptr)
decode.bytes = buffer.length - offset

if (isBigEndian) bitfieldBuf.swap32()

return new Uint32Array(
bitfieldBuf.buffer,
bitfieldBuf.byteOffset,
bitfieldBuf.byteLength / n
)
}

/**
* @param {Buffer} buffer
* @param {number} offset
*/
export function decodingLength(buffer, offset) {
if (!offset) offset = 0

var len = 0

while (offset < buffer.length) {
var next = varint.decode(buffer, offset)
offset += varint.decode.bytes || 0

var repeat = next & 1
var slice = repeat ? (next - (next & 3)) / 4 : next / 2

len += slice
if (!repeat) offset += slice
}

if (offset > buffer.length) throw new Error('Invalid RLE bitfield')

if (len & (n - 1)) return len + (n - (len & (n - 1)))

return len
}

/**
* @param {State} state
*/
function rle(state) {
var len = 0
var bits = 0
var input = state.input

// Skip trimming for now, since it was breaking re-encoding to a Uint32Array.
// Only has a small memory overhead.

// while (state.inputLength > 0 && !input[state.inputLength - 1])
// state.inputLength--

for (var i = 0; i < state.inputLength; i++) {
if (input[i] === bits) {
len++
continue
}

if (len) encodeUpdate(state, i, len, bits)

if (input[i] === 0 || input[i] === 255) {
bits = input[i]
len = 1
} else {
len = 0
}
}

if (len) encodeUpdate(state, state.inputLength, len, bits)
encodeFinal(state)
}

/**
* @param {State & { output: Buffer }} state
* @param {number} end
*/
function encodeHead(state, end) {
var headLength = end - state.inputOffset
varint.encode(2 * headLength, state.output, state.outputOffset)
state.outputOffset += varint.encode.bytes || 0
state.input.copy(state.output, state.outputOffset, state.inputOffset, end)
state.outputOffset += headLength
}

/**
* @param {State} state
*/
function encodeFinal(state) {
var headLength = state.inputLength - state.inputOffset
if (!headLength) return

if (!stateHasOutput(state)) {
state.outputOffset += headLength + varint.encodingLength(2 * headLength)
} else {
encodeHead(state, state.inputLength)
}

state.inputOffset = state.inputLength
}

/**
*
* @param {State} state
* @param {number} i
* @param {number} len
* @param {number} bit
* @returns
*/
function encodeUpdate(state, i, len, bit) {
var headLength = i - len - state.inputOffset
var headCost = headLength
? varint.encodingLength(2 * headLength) + headLength
: 0
var enc = 4 * len + (bit ? 2 : 0) + 1 // len << 2 | bit << 1 | 1
var encCost = headCost + varint.encodingLength(enc)
var baseCost =
varint.encodingLength(2 * (i - state.inputOffset)) + i - state.inputOffset

if (encCost >= baseCost) return

if (!stateHasOutput(state)) {
state.outputOffset += encCost
state.inputOffset = i
return
}

if (headLength) encodeHead(state, i - len)

varint.encode(enc, state.output, state.outputOffset)
state.outputOffset += varint.encode.bytes || 0
state.inputOffset = i
}

/**
*
* @param {State} state
* @returns {state is State & { output: Buffer }}
*/
function stateHasOutput(state) {
return !!state.output
}
17 changes: 17 additions & 0 deletions src/core-manager/compat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { createRequire } from 'module'

const require = createRequire(import.meta.url)

// Export the appropriate version of `quickbit-universal` as the plain import
// may resolve to an older version in some environments
const universal = require('quickbit-universal')
let quickbit = universal
if (
typeof quickbit.findFirst !== 'function' ||
typeof quickbit.findLast !== 'function'
) {
// This should always load the fallback from the locally installed version
quickbit = require('quickbit-universal/fallback')
}

export { quickbit }
Loading

0 comments on commit 4042a8f

Please sign in to comment.