From ecfa37e49d95be8a79191657f6d1efb2e91d2b11 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Wed, 4 Oct 2023 18:22:05 +0100 Subject: [PATCH] feat: add namespace to peer pre-have messages --- proto/buf.gen.yaml | 1 + proto/extensions.proto | 10 +++++ src/core-manager/index.js | 34 ++++++++++------- src/generated/extensions.d.ts | 16 ++++++++ src/generated/extensions.js | 67 +++++++++++++++++++++++++++++++- src/generated/extensions.ts | 72 ++++++++++++++++++++++++++++++++++- src/generated/rpc.d.ts | 13 ++++--- src/generated/rpc.js | 13 +++---- src/generated/rpc.ts | 14 ++++--- tests/core-manager.js | 37 +++++++++++------- 10 files changed, 228 insertions(+), 49 deletions(-) diff --git a/proto/buf.gen.yaml b/proto/buf.gen.yaml index d8e7ccb12..20a208862 100644 --- a/proto/buf.gen.yaml +++ b/proto/buf.gen.yaml @@ -15,3 +15,4 @@ plugins: - useOptionals=none - outputPartialMethods=true - stringEnums=true + - enumsAsLiterals=true diff --git a/proto/extensions.proto b/proto/extensions.proto index d9d228f20..344b334b4 100644 --- a/proto/extensions.proto +++ b/proto/extensions.proto @@ -11,7 +11,17 @@ message ProjectExtension { } message HaveExtension { + + enum Namespace { + auth = 0; + config = 1; + data = 2; + blobIndex = 3; + blob = 4; + } + bytes discoveryKey = 1; uint64 start = 2; bytes encodedBitfield = 3; + Namespace namespace = 4; } diff --git a/src/core-manager/index.js b/src/core-manager/index.js index bcbd2060e..84b6282ae 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -33,7 +33,7 @@ const CREATE_SQL = `CREATE TABLE IF NOT EXISTS ${TABLE} ( /** * @typedef {Object} Events * @property {(coreRecord: CoreRecord) => void} add-core - * @property {(coreDiscoveryId: string, peerId: string, msg: { start: number, bitfield: Uint32Array }) => void} peer-have + * @property {(namespace: Namespace, msg: { coreDiscoveryId: string, peerId: string, start: number, bitfield: Uint32Array }) => void} peer-have */ /** @@ -470,15 +470,21 @@ export class CoreManager extends TypedEmitter { } /** - * @param {HaveMsg} msg + * @param {Omit & { namespace: Namespace | 'UNRECOGNIZED' }} msg * @param {any} peer */ #handleHaveMessage(msg, peer) { - const { start, discoveryKey, bitfield } = msg + const { start, discoveryKey, bitfield, namespace } = msg + if (namespace === 'UNRECOGNIZED') return /** @type {string} */ const peerId = peer.remotePublicKey.toString('hex') - const discoveryId = discoveryKey.toString('hex') - this.emit('peer-have', discoveryId, peerId, { start, bitfield }) + const coreDiscoveryId = discoveryKey.toString('hex') + this.emit('peer-have', namespace, { + coreDiscoveryId, + peerId, + start, + bitfield, + }) } /** @@ -494,7 +500,7 @@ export class CoreManager extends TypedEmitter { peer.protomux.cork() - for (const { core } of this.#coreIndex) { + for (const { core, namespace } of this.#coreIndex) { // We want ready() rather than update() because we are only interested in local data await core.ready() if (core.length === 0) continue @@ -505,7 +511,7 @@ export class CoreManager extends TypedEmitter { // @ts-ignore - accessing internal property const bitfieldIterator = core.core.bitfield.want(0, core.length) for (const { start, bitfield } of bitfieldIterator) { - const message = { start, bitfield, discoveryKey } + const message = { start, bitfield, discoveryKey, namespace } this.#haveExtension.send(message, peer) } } @@ -519,6 +525,7 @@ export class CoreManager extends TypedEmitter { * @property {Buffer} discoveryKey * @property {number} start * @property {Uint32Array} bitfield + * @property {Namespace} namespace */ const ProjectExtensionCodec = { @@ -534,24 +541,25 @@ const ProjectExtensionCodec = { const HaveExtensionCodec = { /** @param {HaveMsg} msg */ - encode({ start, discoveryKey, bitfield }) { + encode({ start, discoveryKey, bitfield, namespace }) { const encodedBitfield = rle.encode(bitfield) - const msg = { start, discoveryKey, encodedBitfield } + const msg = { start, discoveryKey, encodedBitfield, namespace } return HaveExtension.encode(msg).finish() }, /** * @param {Buffer | Uint8Array} buf - * @returns {HaveMsg} + * @returns {Omit & { namespace: HaveMsg['namespace'] | 'UNRECOGNIZED' }} */ decode(buf) { - const { start, discoveryKey, encodedBitfield } = HaveExtension.decode(buf) + const { start, discoveryKey, encodedBitfield, namespace } = + HaveExtension.decode(buf) try { const bitfield = rle.decode(encodedBitfield) - return { start, discoveryKey, bitfield } + return { start, discoveryKey, bitfield, namespace } } catch (e) { // TODO: Log error console.error(e) - return { start, discoveryKey, bitfield: new Uint32Array() } + return { start, discoveryKey, bitfield: new Uint32Array(), namespace } } }, } diff --git a/src/generated/extensions.d.ts b/src/generated/extensions.d.ts index f1a75110b..43d4b4c46 100644 --- a/src/generated/extensions.d.ts +++ b/src/generated/extensions.d.ts @@ -12,7 +12,19 @@ export interface HaveExtension { discoveryKey: Buffer; start: number; encodedBitfield: Buffer; + namespace: HaveExtension_Namespace; } +export declare const HaveExtension_Namespace: { + readonly auth: "auth"; + readonly config: "config"; + readonly data: "data"; + readonly blobIndex: "blobIndex"; + readonly blob: "blob"; + readonly UNRECOGNIZED: "UNRECOGNIZED"; +}; +export type HaveExtension_Namespace = typeof HaveExtension_Namespace[keyof typeof HaveExtension_Namespace]; +export declare function haveExtension_NamespaceFromJSON(object: any): HaveExtension_Namespace; +export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace): number; export declare const ProjectExtension: { encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension; @@ -54,18 +66,22 @@ export declare const HaveExtension: { discoveryKey?: Buffer; start?: number; encodedBitfield?: Buffer; + namespace?: HaveExtension_Namespace; } & { discoveryKey?: Buffer; start?: number; encodedBitfield?: Buffer; + namespace?: HaveExtension_Namespace; } & { [K in Exclude]: never; }>(base?: I): HaveExtension; fromPartial]: never; }>(object: I_1): HaveExtension; }; diff --git a/src/generated/extensions.js b/src/generated/extensions.js index b4fb55bcc..c51e6dae7 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -1,6 +1,54 @@ /* eslint-disable */ import Long from "long"; import _m0 from "protobufjs/minimal.js"; +export var HaveExtension_Namespace = { + auth: "auth", + config: "config", + data: "data", + blobIndex: "blobIndex", + blob: "blob", + UNRECOGNIZED: "UNRECOGNIZED", +}; +export function haveExtension_NamespaceFromJSON(object) { + switch (object) { + case 0: + case "auth": + return HaveExtension_Namespace.auth; + case 1: + case "config": + return HaveExtension_Namespace.config; + case 2: + case "data": + return HaveExtension_Namespace.data; + case 3: + case "blobIndex": + return HaveExtension_Namespace.blobIndex; + case 4: + case "blob": + return HaveExtension_Namespace.blob; + case -1: + case "UNRECOGNIZED": + default: + return HaveExtension_Namespace.UNRECOGNIZED; + } +} +export function haveExtension_NamespaceToNumber(object) { + switch (object) { + case HaveExtension_Namespace.auth: + return 0; + case HaveExtension_Namespace.config: + return 1; + case HaveExtension_Namespace.data: + return 2; + case HaveExtension_Namespace.blobIndex: + return 3; + case HaveExtension_Namespace.blob: + return 4; + case HaveExtension_Namespace.UNRECOGNIZED: + default: + return -1; + } +} function createBaseProjectExtension() { return { wantCoreKeys: [], @@ -107,7 +155,12 @@ export var ProjectExtension = { }, }; function createBaseHaveExtension() { - return { discoveryKey: Buffer.alloc(0), start: 0, encodedBitfield: Buffer.alloc(0) }; + return { + discoveryKey: Buffer.alloc(0), + start: 0, + encodedBitfield: Buffer.alloc(0), + namespace: HaveExtension_Namespace.auth, + }; } export var HaveExtension = { encode: function (message, writer) { @@ -121,6 +174,9 @@ export var HaveExtension = { if (message.encodedBitfield.length !== 0) { writer.uint32(26).bytes(message.encodedBitfield); } + if (message.namespace !== HaveExtension_Namespace.auth) { + writer.uint32(32).int32(haveExtension_NamespaceToNumber(message.namespace)); + } return writer; }, decode: function (input, length) { @@ -148,6 +204,12 @@ export var HaveExtension = { } message.encodedBitfield = reader.bytes(); continue; + case 4: + if (tag !== 32) { + break; + } + message.namespace = haveExtension_NamespaceFromJSON(reader.int32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -160,11 +222,12 @@ export var HaveExtension = { return HaveExtension.fromPartial(base !== null && base !== void 0 ? base : {}); }, fromPartial: function (object) { - var _a, _b, _c; + var _a, _b, _c, _d; var message = createBaseHaveExtension(); message.discoveryKey = (_a = object.discoveryKey) !== null && _a !== void 0 ? _a : Buffer.alloc(0); message.start = (_b = object.start) !== null && _b !== void 0 ? _b : 0; message.encodedBitfield = (_c = object.encodedBitfield) !== null && _c !== void 0 ? _c : Buffer.alloc(0); + message.namespace = (_d = object.namespace) !== null && _d !== void 0 ? _d : HaveExtension_Namespace.auth; return message; }, }; diff --git a/src/generated/extensions.ts b/src/generated/extensions.ts index cf65416d3..6c3d45a00 100644 --- a/src/generated/extensions.ts +++ b/src/generated/extensions.ts @@ -15,6 +15,60 @@ export interface HaveExtension { discoveryKey: Buffer; start: number; encodedBitfield: Buffer; + namespace: HaveExtension_Namespace; +} + +export const HaveExtension_Namespace = { + auth: "auth", + config: "config", + data: "data", + blobIndex: "blobIndex", + blob: "blob", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type HaveExtension_Namespace = typeof HaveExtension_Namespace[keyof typeof HaveExtension_Namespace]; + +export function haveExtension_NamespaceFromJSON(object: any): HaveExtension_Namespace { + switch (object) { + case 0: + case "auth": + return HaveExtension_Namespace.auth; + case 1: + case "config": + return HaveExtension_Namespace.config; + case 2: + case "data": + return HaveExtension_Namespace.data; + case 3: + case "blobIndex": + return HaveExtension_Namespace.blobIndex; + case 4: + case "blob": + return HaveExtension_Namespace.blob; + case -1: + case "UNRECOGNIZED": + default: + return HaveExtension_Namespace.UNRECOGNIZED; + } +} + +export function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace): number { + switch (object) { + case HaveExtension_Namespace.auth: + return 0; + case HaveExtension_Namespace.config: + return 1; + case HaveExtension_Namespace.data: + return 2; + case HaveExtension_Namespace.blobIndex: + return 3; + case HaveExtension_Namespace.blob: + return 4; + case HaveExtension_Namespace.UNRECOGNIZED: + default: + return -1; + } } function createBaseProjectExtension(): ProjectExtension { @@ -125,7 +179,12 @@ export const ProjectExtension = { }; function createBaseHaveExtension(): HaveExtension { - return { discoveryKey: Buffer.alloc(0), start: 0, encodedBitfield: Buffer.alloc(0) }; + return { + discoveryKey: Buffer.alloc(0), + start: 0, + encodedBitfield: Buffer.alloc(0), + namespace: HaveExtension_Namespace.auth, + }; } export const HaveExtension = { @@ -139,6 +198,9 @@ export const HaveExtension = { if (message.encodedBitfield.length !== 0) { writer.uint32(26).bytes(message.encodedBitfield); } + if (message.namespace !== HaveExtension_Namespace.auth) { + writer.uint32(32).int32(haveExtension_NamespaceToNumber(message.namespace)); + } return writer; }, @@ -170,6 +232,13 @@ export const HaveExtension = { message.encodedBitfield = reader.bytes() as Buffer; continue; + case 4: + if (tag !== 32) { + break; + } + + message.namespace = haveExtension_NamespaceFromJSON(reader.int32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -187,6 +256,7 @@ export const HaveExtension = { message.discoveryKey = object.discoveryKey ?? Buffer.alloc(0); message.start = object.start ?? 0; message.encodedBitfield = object.encodedBitfield ?? Buffer.alloc(0); + message.namespace = object.namespace ?? HaveExtension_Namespace.auth; return message; }, }; diff --git a/src/generated/rpc.d.ts b/src/generated/rpc.d.ts index acb33baa1..a6ba5e6dd 100644 --- a/src/generated/rpc.d.ts +++ b/src/generated/rpc.d.ts @@ -14,12 +14,13 @@ export interface InviteResponse { projectKey: Buffer; decision: InviteResponse_Decision; } -export declare enum InviteResponse_Decision { - REJECT = "REJECT", - ACCEPT = "ACCEPT", - ALREADY = "ALREADY", - UNRECOGNIZED = "UNRECOGNIZED" -} +export declare const InviteResponse_Decision: { + readonly REJECT: "REJECT"; + readonly ACCEPT: "ACCEPT"; + readonly ALREADY: "ALREADY"; + readonly UNRECOGNIZED: "UNRECOGNIZED"; +}; +export type InviteResponse_Decision = typeof InviteResponse_Decision[keyof typeof InviteResponse_Decision]; export declare function inviteResponse_DecisionFromJSON(object: any): InviteResponse_Decision; export declare function inviteResponse_DecisionToNumber(object: InviteResponse_Decision): number; export interface DeviceInfo { diff --git a/src/generated/rpc.js b/src/generated/rpc.js index 1e55ceecf..9f77dfbd6 100644 --- a/src/generated/rpc.js +++ b/src/generated/rpc.js @@ -1,13 +1,12 @@ /* eslint-disable */ import _m0 from "protobufjs/minimal.js"; import { EncryptionKeys } from "./keys.js"; -export var InviteResponse_Decision; -(function (InviteResponse_Decision) { - InviteResponse_Decision["REJECT"] = "REJECT"; - InviteResponse_Decision["ACCEPT"] = "ACCEPT"; - InviteResponse_Decision["ALREADY"] = "ALREADY"; - InviteResponse_Decision["UNRECOGNIZED"] = "UNRECOGNIZED"; -})(InviteResponse_Decision || (InviteResponse_Decision = {})); +export var InviteResponse_Decision = { + REJECT: "REJECT", + ACCEPT: "ACCEPT", + ALREADY: "ALREADY", + UNRECOGNIZED: "UNRECOGNIZED", +}; export function inviteResponse_DecisionFromJSON(object) { switch (object) { case 0: diff --git a/src/generated/rpc.ts b/src/generated/rpc.ts index 1622fd63a..02249584f 100644 --- a/src/generated/rpc.ts +++ b/src/generated/rpc.ts @@ -18,12 +18,14 @@ export interface InviteResponse { decision: InviteResponse_Decision; } -export enum InviteResponse_Decision { - REJECT = "REJECT", - ACCEPT = "ACCEPT", - ALREADY = "ALREADY", - UNRECOGNIZED = "UNRECOGNIZED", -} +export const InviteResponse_Decision = { + REJECT: "REJECT", + ACCEPT: "ACCEPT", + ALREADY: "ALREADY", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type InviteResponse_Decision = typeof InviteResponse_Decision[keyof typeof InviteResponse_Decision]; export function inviteResponse_DecisionFromJSON(object: any): InviteResponse_Decision { switch (object) { diff --git a/tests/core-manager.js b/tests/core-manager.js index 14916262f..cb7e2b2cf 100644 --- a/tests/core-manager.js +++ b/tests/core-manager.js @@ -481,23 +481,31 @@ test('sends "haves" bitfields over project creator core replication stream', asy const cm2 = createCoreManager({ projectKey }) /** * For each peer, indexed by peerId, a map of hypercore bitfields, indexed by discoveryId - * @type {Map>} + * @type {Map>>} */ const havesByPeer = new Map() - cm2.on('peer-have', (coreDiscoveryId, peerId, { start, bitfield }) => { - let havesByCore = havesByPeer.get(peerId) - if (!havesByCore) { - havesByCore = new Map() - havesByPeer.set(peerId, havesByCore) - } - let remoteBitfield = havesByCore.get(coreDiscoveryId) - if (!remoteBitfield) { - remoteBitfield = new RemoteBitfield() - havesByCore.set(coreDiscoveryId, remoteBitfield) + cm2.on( + 'peer-have', + (namespace, { coreDiscoveryId, peerId, start, bitfield }) => { + let havesByNamespace = havesByPeer.get(peerId) + if (!havesByNamespace) { + havesByNamespace = new Map() + havesByPeer.set(peerId, havesByNamespace) + } + let havesByCore = havesByNamespace.get(namespace) + if (!havesByCore) { + havesByCore = new Map() + havesByNamespace.set(namespace, havesByCore) + } + let remoteBitfield = havesByCore.get(coreDiscoveryId) + if (!remoteBitfield) { + remoteBitfield = new RemoteBitfield() + havesByCore.set(coreDiscoveryId, remoteBitfield) + } + remoteBitfield.insert(start, bitfield) } - remoteBitfield.insert(start, bitfield) - }) + ) const cm1Core = cm1.getWriterCore('data').core await cm1Core.ready() @@ -524,7 +532,8 @@ test('sends "haves" bitfields over project creator core replication stream', asy await new Promise((res) => setTimeout(res, 200)) const peerId = n1.publicKey.toString('hex') - const havesByCore = havesByPeer.get(peerId) + const havesByNamespace = havesByPeer.get(peerId) + const havesByCore = havesByNamespace.get('data') t.ok(havesByCore) const bitfield = havesByCore.get(cm1Core.discoveryKey.toString('hex')) t.ok(bitfield)