From 0f4a0d6c76d071320477495567c4d2a510f16e28 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 30 Nov 2023 15:14:54 +0900 Subject: [PATCH] WIP --- src/capabilities.js | 16 +- src/core-manager/index.js | 222 +++---------- src/core-ownership.js | 37 ++- src/datastore/index.js | 76 +++-- src/datatype/index.d.ts | 3 +- src/datatype/index.js | 5 +- src/generated/extensions.js | 527 +++++++++++++++++-------------- src/logger.js | 1 + src/mapeo-project.js | 2 + src/sync/peer-sync-controller.js | 198 +++++++----- src/sync/sync-api.js | 162 ++++++++-- test-e2e/sync.js | 110 +++---- tests/datastore.js | 4 +- 13 files changed, 738 insertions(+), 625 deletions(-) diff --git a/src/capabilities.js b/src/capabilities.js index 8b683c95c..ec6027a74 100644 --- a/src/capabilities.js +++ b/src/capabilities.js @@ -1,6 +1,7 @@ import { currentSchemaVersions } from '@mapeo/schema' import mapObject from 'map-obj' -import { kCreateWithDocId } from './datatype/index.js' +import { kCreateWithDocId, kDataStore } from './datatype/index.js' +import { TypedEmitter } from 'tiny-typed-emitter' // Randomly generated 8-byte encoded as hex export const COORDINATOR_ROLE_ID = 'f7c150f5a3a9a855' @@ -140,7 +141,15 @@ export const DEFAULT_CAPABILITIES = { }, } -export class Capabilities { +/** + * @typedef {object} CapabilitiesEvents + * @property {(docs: import('@mapeo/schema').Role[]) => void} update - Emitted when new role records are indexed + */ + +/** + * @extends {TypedEmitter} + */ +export class Capabilities extends TypedEmitter { #dataType #coreOwnership #coreManager @@ -165,11 +174,14 @@ export class Capabilities { * @param {Buffer} opts.deviceKey public key of this device */ constructor({ dataType, coreOwnership, coreManager, projectKey, deviceKey }) { + super() this.#dataType = dataType this.#coreOwnership = coreOwnership this.#coreManager = coreManager this.#projectCreatorAuthCoreId = projectKey.toString('hex') this.#ownDeviceId = deviceKey.toString('hex') + + dataType[kDataStore].on('role', this.emit.bind(this, 'update')) } /** diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 9168449a1..788c6fccb 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -6,9 +6,6 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { CoreIndex } from './core-index.js' import * as rle from './bitfield-rle.js' import { Logger } from '../logger.js' -import { keyToId } from '../utils.js' -import { discoveryKey } from 'hypercore-crypto' -import Hypercore from 'hypercore' export const kCoreManagerReplicate = Symbol('replicate core manager') // WARNING: Changing these will break things for existing apps, since namespaces @@ -55,12 +52,6 @@ export class CoreManager extends TypedEmitter { #haveExtension #deviceId #l - /** - * We use this to reduce network traffic caused by requesting the same key - * from multiple clients. - * TODO: Remove items from this set after a max age - */ - #keyRequests = new TrackedKeyRequests() #autoDownload static get namespaces() { @@ -155,8 +146,8 @@ export class CoreManager extends TypedEmitter { 'mapeo/project', { encoding: ProjectExtensionCodec, - onmessage: (msg, peer) => { - this.#handleProjectMessage(msg, peer) + onmessage: (msg) => { + this.#handleProjectMessage(msg) }, } ) @@ -168,16 +159,6 @@ export class CoreManager extends TypedEmitter { }, }) - this.#creatorCore.on('peer-add', (peer) => { - this.#sendHaves(peer) - }) - this.#creatorCore.on('peer-remove', (peer) => { - // When a peer is removed we clean up any unanswered key requests, so that - // we will request from a different peer, and to avoid the tracking of key - // requests growing without bounds. - this.#keyRequests.deleteByPeerKey(peer.remotePublicKey) - }) - this.#ready = Promise.all( [...this.#coreIndex].map(({ core }) => core.ready()) ) @@ -253,7 +234,6 @@ export class CoreManager extends TypedEmitter { */ async close() { this.#state = 'closing' - this.#keyRequests.clear() const promises = [] for (const { core } of this.#coreIndex) { promises.push(core.close()) @@ -342,69 +322,39 @@ export class CoreManager extends TypedEmitter { } /** - * Send an extension message over the project creator core replication stream - * requesting a core key for the given discovery key. - * - * @param {Buffer} peerKey - * @param {Buffer} discoveryKey + * @param {ProjectExtension} msg */ - requestCoreKey(peerKey, discoveryKey) { - // No-op if we already have this core - if (this.getCoreByDiscoveryKey(discoveryKey)) return - const peer = this.#creatorCore.peers.find((peer) => { - return peer.remotePublicKey.equals(peerKey) - }) - if (!peer) { - // This should not happen because this is only called from SyncApi, which - // checks the peer exists before calling this method. - this.#l.log( - 'Attempted to request core key for %h, but no connected peer %h', - discoveryKey, - peerKey - ) - return + #handleProjectMessage({ authCoreKeys }) { + for (const coreKey of authCoreKeys) { + // Use public method - these must be persisted (private method defaults to persisted=false) + this.addCore(coreKey, 'auth') } - // Only request a key once, e.g. from the peer we first receive it from (we - // can assume that a peer must have the key if we see the discovery key in - // the protomux). This is necessary to reduce network traffic for many newly - // connected peers - otherwise duplicate requests will be sent to every peer - if (this.#keyRequests.has(discoveryKey)) return - this.#keyRequests.set(discoveryKey, peerKey) + } - this.#l.log( - 'Requesting core key for discovery key %h from peer %h', - discoveryKey, - peerKey - ) - const message = ProjectExtension.fromPartial({ - wantCoreKeys: [discoveryKey], - }) - this.#projectExtension.send(message, peer) + /** + * Sends auth core keys to the given peer, skipping any keys that we know the + * peer has already (depends on the peer having already replicated the auth + * cores it has) + * + * @param {any} peer + */ + sendAuthCoreKeys(peer) { + this.#sendCoreKeys(peer, ['auth']) } /** - * @param {ProjectExtension} msg + * We only send non-auth core keys to a peer for unit tests * @param {any} peer + * @param {Readonly} namespaces */ - #handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) { + #sendCoreKeys(peer, namespaces) { const message = ProjectExtension.create() - let hasKeys = false - for (const discoveryKey of wantCoreKeys) { - const coreRecord = this.getCoreByDiscoveryKey(discoveryKey) - if (!coreRecord) continue - message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key) - hasKeys = true - } - if (hasKeys) { - this.#projectExtension.send(message, peer) - } - for (const namespace of NAMESPACES) { - for (const coreKey of coreKeys[`${namespace}CoreKeys`]) { - // Use public method - these must be persisted (private method defaults to persisted=false) - this.addCore(coreKey, namespace) - this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey)) + for (const ns of namespaces) { + for (const { key } of this.getCores(ns)) { + message[`${ns}CoreKeys`].push(key) } } + this.#projectExtension.send(message, peer) } /** @@ -426,21 +376,17 @@ export class CoreManager extends TypedEmitter { } /** - * * @param {any} peer + * @param {Exclude} namespace */ - async #sendHaves(peer) { - if (!peer) { - console.warn('sendHaves no peer', peer.remotePublicKey) - // TODO: How to handle this and when does it happen? - return - } + async sendHaves(peer, namespace) { + // We want ready() rather than update() because we are only interested in + // local data. This waits for all cores to be ready. + await this.ready() peer.protomux.cork() - for (const { core, namespace } of this.#coreIndex) { - // We want ready() rather than update() because we are only interested in local data - await core.ready() + for (const { core } of this.getCores(namespace)) { if (core.length === 0) continue const { discoveryKey } = core // This will always be defined after ready(), but need to let TS know @@ -465,18 +411,14 @@ export class CoreManager extends TypedEmitter { * @returns */ [kCoreManagerReplicate](stream) { - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: async (discoveryKey) => { - const peer = await findPeer( - this.creatorCore, - // @ts-ignore - protocolStream.noiseStream.remotePublicKey - ) - if (!peer) return - this.requestCoreKey(peer.remotePublicKey, discoveryKey) - }, + const protocolStream = this.#corestore.replicate(stream) + this.#creatorCore.on('peer-add', (peer) => { + // Normally only auth core keys are sent, but for unit tests we need to + // send all of them, because unit tests don't include the Sync API which + // adds cores from core ownership records. + this.#sendCoreKeys(peer, NAMESPACES) }) - return this.#corestore.replicate(stream) + return protocolStream } } @@ -523,93 +465,3 @@ const HaveExtensionCodec = { } }, } - -class TrackedKeyRequests { - /** @type {Map} */ - #byDiscoveryId = new Map() - /** @type {Map>} */ - #byPeerId = new Map() - - /** - * @param {Buffer} discoveryKey - * @param {Buffer} peerKey - */ - set(discoveryKey, peerKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) || new Set() - this.#byDiscoveryId.set(discoveryId, peerId) - existingForPeer.add(discoveryId) - this.#byPeerId.set(peerId, existingForPeer) - return this - } - /** - * @param {Buffer} discoveryKey - */ - has(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - return this.#byDiscoveryId.has(discoveryId) - } - /** - * @param {Buffer} discoveryKey - */ - deleteByDiscoveryKey(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = this.#byDiscoveryId.get(discoveryId) - if (!peerId) return false - this.#byDiscoveryId.delete(discoveryId) - const existingForPeer = this.#byPeerId.get(peerId) - if (existingForPeer) { - existingForPeer.delete(discoveryId) - } - return true - } - /** - * @param {Buffer} peerKey - */ - deleteByPeerKey(peerKey) { - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) - if (!existingForPeer) return - for (const discoveryId of existingForPeer) { - this.#byDiscoveryId.delete(discoveryId) - } - this.#byPeerId.delete(peerId) - } - clear() { - this.#byDiscoveryId.clear() - this.#byPeerId.clear() - } -} - -/** - * @param {Hypercore<"binary", Buffer>} core - * @param {Buffer} publicKey - * @param {{ timeout?: number }} [opts] - */ -function findPeer(core, publicKey, { timeout = 200 } = {}) { - const peer = core.peers.find((peer) => { - return peer.remotePublicKey.equals(publicKey) - }) - if (peer) return peer - // This is called from the from the handleDiscoveryId event, which can - // happen before the peer connection is fully established, so we wait for - // the `peer-add` event, with a timeout in case the peer never gets added - return new Promise(function (res) { - const timeoutId = setTimeout(function () { - core.off('peer-add', onPeer) - res(null) - }, timeout) - - core.on('peer-add', onPeer) - - /** @param {any} peer */ - function onPeer(peer) { - if (peer.remotePublicKey.equals(publicKey)) { - clearTimeout(timeoutId) - core.off('peer-add', onPeer) - res(peer) - } - } - }) -} diff --git a/src/core-ownership.js b/src/core-ownership.js index 3a4608982..415308919 100644 --- a/src/core-ownership.js +++ b/src/core-ownership.js @@ -4,17 +4,31 @@ import { parseVersionId } from '@mapeo/schema' import { defaultGetWinner } from '@mapeo/sqlite-indexer' import assert from 'node:assert' import sodium from 'sodium-universal' -import { kTable, kSelect, kCreateWithDocId } from './datatype/index.js' +import { + kTable, + kSelect, + kCreateWithDocId, + kDataStore, +} from './datatype/index.js' import { eq, or } from 'drizzle-orm' import mapObject from 'map-obj' import { discoveryKey } from 'hypercore-crypto' import pDefer from 'p-defer' +import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('./types.js').CoreOwnershipWithSignatures} CoreOwnershipWithSignatures */ -export class CoreOwnership { +/** + * @typedef {object} CoreOwnershipEvents + * @property {(docs: import('@mapeo/schema').CoreOwnership[]) => void} update - Emitted when new coreOwnership records are indexed + */ + +/** + * @extends {TypedEmitter} + */ +export class CoreOwnership extends TypedEmitter { #dataType #ownershipWriteDone /** @@ -31,11 +45,14 @@ export class CoreOwnership { * @param {import('./types.js').KeyPair} opts.identityKeypair */ constructor({ dataType, coreKeypairs, identityKeypair }) { + super() this.#dataType = dataType - const authWriterCore = dataType.writerCore + const authWriterCore = dataType[kDataStore].writerCore const deferred = pDefer() this.#ownershipWriteDone = deferred.promise + dataType[kDataStore].on('coreOwnership', this.emit.bind(this, 'update')) + const writeOwnership = () => { if (authWriterCore.length > 0) { deferred.resolve() @@ -49,7 +66,7 @@ export class CoreOwnership { if (authWriterCore.opened) { writeOwnership() } else { - authWriterCore.on('ready', writeOwnership) + authWriterCore.once('ready', writeOwnership) } } @@ -75,17 +92,23 @@ export class CoreOwnership { } /** - * * @param {string} deviceId * @param {typeof NAMESPACES[number]} namespace * @returns {Promise} coreId of core belonging to `deviceId` for `namespace` */ async getCoreId(deviceId, namespace) { - await this.#ownershipWriteDone - const result = await this.#dataType.getByDocId(deviceId) + const result = await this.get(deviceId) return result[`${namespace}CoreId`] } + /** + * @param {string} deviceId + */ + async get(deviceId) { + await this.#ownershipWriteDone + return this.#dataType.getByDocId(deviceId) + } + /** * * @param {import('./types.js').KeyPair} identityKeypair diff --git a/src/datastore/index.js b/src/datastore/index.js index c02bb098d..3f5dbc1e8 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -1,23 +1,22 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema' +import { + encode, + decode, + getVersionId, + parseVersionId, + decodeBlockPrefix, +} from '@mapeo/schema' import MultiCoreIndexer from 'multi-core-indexer' import pDefer from 'p-defer' import { discoveryKey } from 'hypercore-crypto' +import { TypedEmitter } from 'tiny-typed-emitter' -/** - * @typedef {import('multi-core-indexer').IndexEvents} IndexEvents - */ /** * @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc */ /** * @typedef {import('../datatype/index.js').MapeoDocTablesMap} MapeoDocTablesMap */ -/** - * @typedef {object} DefaultEmitterEvents - * @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} newListener - * @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} removeListener - */ + /** * @template T * @template {keyof any} K @@ -34,10 +33,17 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({ * @typedef {typeof NAMESPACE_SCHEMAS} NamespaceSchemas */ +/** + * @template {MapeoDoc['schemaName']} TSchemaName + * @typedef {{ + * [S in TSchemaName]: (docs: Extract[]) => void + * }} DataStoreEvents + */ + /** * @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas] * @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]] - * @extends {TypedEmitter} + * @extends {TypedEmitter>} */ export class DataStore extends TypedEmitter { #coreManager @@ -49,6 +55,7 @@ export class DataStore extends TypedEmitter { #pendingIndex = new Map() /** @type {Set['promise']>} */ #pendingAppends = new Set() + #emitUpdates /** * @param {object} opts @@ -56,12 +63,14 @@ export class DataStore extends TypedEmitter { * @param {TNamespace} opts.namespace * @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise} opts.batch * @param {MultiCoreIndexer.StorageParam} opts.storage + * @param {TSchemaName[]} [opts.emitUpdates] - List of schemas to emit updates for. Emitting an update is expensive because it requires decoding an entry, so should only be done schemas that will not have many documents in the database */ - constructor({ coreManager, namespace, batch, storage }) { + constructor({ coreManager, namespace, batch, storage, emitUpdates }) { super() this.#coreManager = coreManager this.#namespace = namespace this.#batch = batch + this.#emitUpdates = emitUpdates && new Set(emitUpdates) this.#writerCore = coreManager.getWriterCore(namespace).core const cores = coreManager.getCores(namespace).map((cr) => cr.core) this.#coreIndexer = new MultiCoreIndexer(cores, { @@ -72,16 +81,6 @@ export class DataStore extends TypedEmitter { if (coreRecord.namespace !== namespace) return this.#coreIndexer.addCore(coreRecord.core) }) - - // Forward events from coreIndexer - this.on('newListener', (eventName, listener) => { - if (['newListener', 'removeListener'].includes(eventName)) return - this.#coreIndexer.on(eventName, listener) - }) - this.on('removeListener', (eventName, listener) => { - if (['newListener', 'removeListener'].includes(eventName)) return - this.#coreIndexer.off(eventName, listener) - }) } get indexer() { @@ -112,18 +111,45 @@ export class DataStore extends TypedEmitter { async #handleEntries(entries) { await this.#batch(entries) await Promise.all(this.#pendingAppends) + const toEmit = + /** @type {{ [S in TSchemaName]: Array> }} */ + ({}) // Writes to the writerCore need to wait until the entry is indexed before // returning, so we check if any incoming entry has a pending promise for (const entry of entries) { - if (!entry.key.equals(this.#writerCore.key)) continue - const versionId = getVersionId({ + const versionObj = { coreDiscoveryKey: discoveryKey(entry.key), index: entry.index, - }) + } + // We do this here rather than in IndexWriter (which is already decoding + // the entries and could return decoded entries from the batch function or + // emit events itself) because IndexWriter will eventually be in a worker + // thread, and my assumption is that sending a decoded doc over a + // MessageChannel from the worker is more expensive than decoding the + // entry here. This also avoids setting up RPC calls with the worker. + if (this.#emitUpdates) { + try { + const { schemaName } = decodeBlockPrefix(entry.block) + // @ts-ignore + if (!this.#emitUpdates.has(schemaName)) return + // @ts-ignore + toEmit[schemaName] = toEmit[schemaName] || [] + // @ts-ignore + toEmit[schemaName].push(decode(entry.block, versionObj)) + } catch (e) { + // Ignore docs we can't decode + } + } + if (!entry.key.equals(this.#writerCore.key)) continue + const versionId = getVersionId(versionObj) const pending = this.#pendingIndex.get(versionId) if (!pending) continue pending.resolve() } + for (const [schemaName, docs] of Object.entries(toEmit)) { + // @ts-ignore + this.emit(schemaName, docs) + } } /** diff --git a/src/datatype/index.d.ts b/src/datatype/index.d.ts index 9397857b8..cf01a07b7 100644 --- a/src/datatype/index.d.ts +++ b/src/datatype/index.d.ts @@ -29,6 +29,7 @@ type MapeoDocTablesMap = { export const kCreateWithDocId: unique symbol export const kSelect: unique symbol export const kTable: unique symbol +export const kDataStore: unique symbol type OmitUnion = T extends any ? Omit : never type ExcludeSchema< @@ -60,7 +61,7 @@ export class DataType< get [kTable](): TTable - get writerCore(): Hypercore<'binary', Buffer> + get [kDataStore](): TDataStore [kCreateWithDocId]( docId: string, diff --git a/src/datatype/index.js b/src/datatype/index.js index c072a1f41..80b898eb5 100644 --- a/src/datatype/index.js +++ b/src/datatype/index.js @@ -47,6 +47,7 @@ function generateDate() { export const kCreateWithDocId = Symbol('kCreateWithDocId') export const kSelect = Symbol('select') export const kTable = Symbol('table') +export const kDataStore = Symbol('dataStore') /** * @template {import('../datastore/index.js').DataStore} TDataStore @@ -91,8 +92,8 @@ export class DataType { return this.#table } - get writerCore() { - return this.#dataStore.writerCore + get [kDataStore]() { + return this.#dataStore } /** diff --git a/src/generated/extensions.js b/src/generated/extensions.js index c51e6dae7..741d330dd 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -1,258 +1,309 @@ /* eslint-disable */ -import Long from "long"; -import _m0 from "protobufjs/minimal.js"; +import Long from 'long' +import _m0 from 'protobufjs/minimal.js' export var HaveExtension_Namespace = { - auth: "auth", - config: "config", - data: "data", - blobIndex: "blobIndex", - blob: "blob", - UNRECOGNIZED: "UNRECOGNIZED", -}; + auth: 'auth', + config: 'config', + data: 'data', + blobIndex: 'blobIndex', + blob: 'blob', + UNRECOGNIZED: 'UNRECOGNIZED', +} export function haveExtension_NamespaceFromJSON(object) { - switch (object) { - case 0: - case "auth": - return HaveExtension_Namespace.auth; - case 1: - case "config": - return HaveExtension_Namespace.config; - case 2: - case "data": - return HaveExtension_Namespace.data; - case 3: - case "blobIndex": - return HaveExtension_Namespace.blobIndex; - case 4: - case "blob": - return HaveExtension_Namespace.blob; - case -1: - case "UNRECOGNIZED": - default: - return HaveExtension_Namespace.UNRECOGNIZED; - } + switch (object) { + case 0: + case 'auth': + return HaveExtension_Namespace.auth + case 1: + case 'config': + return HaveExtension_Namespace.config + case 2: + case 'data': + return HaveExtension_Namespace.data + case 3: + case 'blobIndex': + return HaveExtension_Namespace.blobIndex + case 4: + case 'blob': + return HaveExtension_Namespace.blob + case -1: + case 'UNRECOGNIZED': + default: + return HaveExtension_Namespace.UNRECOGNIZED + } } export function haveExtension_NamespaceToNumber(object) { - switch (object) { - case HaveExtension_Namespace.auth: - return 0; - case HaveExtension_Namespace.config: - return 1; - case HaveExtension_Namespace.data: - return 2; - case HaveExtension_Namespace.blobIndex: - return 3; - case HaveExtension_Namespace.blob: - return 4; - case HaveExtension_Namespace.UNRECOGNIZED: - default: - return -1; - } + switch (object) { + case HaveExtension_Namespace.auth: + return 0 + case HaveExtension_Namespace.config: + return 1 + case HaveExtension_Namespace.data: + return 2 + case HaveExtension_Namespace.blobIndex: + return 3 + case HaveExtension_Namespace.blob: + return 4 + case HaveExtension_Namespace.UNRECOGNIZED: + default: + return -1 + } } function createBaseProjectExtension() { - return { - wantCoreKeys: [], - authCoreKeys: [], - configCoreKeys: [], - dataCoreKeys: [], - blobIndexCoreKeys: [], - blobCoreKeys: [], - }; + return { + wantCoreKeys: [], + authCoreKeys: [], + configCoreKeys: [], + dataCoreKeys: [], + blobIndexCoreKeys: [], + blobCoreKeys: [], + } } export var ProjectExtension = { - encode: function (message, writer) { - if (writer === void 0) { writer = _m0.Writer.create(); } - for (var _i = 0, _a = message.wantCoreKeys; _i < _a.length; _i++) { - var v = _a[_i]; - writer.uint32(10).bytes(v); - } - for (var _b = 0, _c = message.authCoreKeys; _b < _c.length; _b++) { - var v = _c[_b]; - writer.uint32(18).bytes(v); - } - for (var _d = 0, _e = message.configCoreKeys; _d < _e.length; _d++) { - var v = _e[_d]; - writer.uint32(26).bytes(v); - } - for (var _f = 0, _g = message.dataCoreKeys; _f < _g.length; _f++) { - var v = _g[_f]; - writer.uint32(34).bytes(v); - } - for (var _h = 0, _j = message.blobIndexCoreKeys; _h < _j.length; _h++) { - var v = _j[_h]; - writer.uint32(42).bytes(v); - } - for (var _k = 0, _l = message.blobCoreKeys; _k < _l.length; _k++) { - var v = _l[_k]; - writer.uint32(50).bytes(v); - } - return writer; - }, - decode: function (input, length) { - var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - var end = length === undefined ? reader.len : reader.pos + length; - var message = createBaseProjectExtension(); - while (reader.pos < end) { - var tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - message.wantCoreKeys.push(reader.bytes()); - continue; - case 2: - if (tag !== 18) { - break; - } - message.authCoreKeys.push(reader.bytes()); - continue; - case 3: - if (tag !== 26) { - break; - } - message.configCoreKeys.push(reader.bytes()); - continue; - case 4: - if (tag !== 34) { - break; - } - message.dataCoreKeys.push(reader.bytes()); - continue; - case 5: - if (tag !== 42) { - break; - } - message.blobIndexCoreKeys.push(reader.bytes()); - continue; - case 6: - if (tag !== 50) { - break; - } - message.blobCoreKeys.push(reader.bytes()); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - create: function (base) { - return ProjectExtension.fromPartial(base !== null && base !== void 0 ? base : {}); - }, - fromPartial: function (object) { - var _a, _b, _c, _d, _e, _f; - var message = createBaseProjectExtension(); - message.wantCoreKeys = ((_a = object.wantCoreKeys) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || []; - message.authCoreKeys = ((_b = object.authCoreKeys) === null || _b === void 0 ? void 0 : _b.map(function (e) { return e; })) || []; - message.configCoreKeys = ((_c = object.configCoreKeys) === null || _c === void 0 ? void 0 : _c.map(function (e) { return e; })) || []; - message.dataCoreKeys = ((_d = object.dataCoreKeys) === null || _d === void 0 ? void 0 : _d.map(function (e) { return e; })) || []; - message.blobIndexCoreKeys = ((_e = object.blobIndexCoreKeys) === null || _e === void 0 ? void 0 : _e.map(function (e) { return e; })) || []; - message.blobCoreKeys = ((_f = object.blobCoreKeys) === null || _f === void 0 ? void 0 : _f.map(function (e) { return e; })) || []; - return message; - }, -}; + encode: function (message, writer) { + if (writer === void 0) { + writer = _m0.Writer.create() + } + for (var _i = 0, _a = message.wantCoreKeys; _i < _a.length; _i++) { + var v = _a[_i] + writer.uint32(10).bytes(v) + } + for (var _b = 0, _c = message.authCoreKeys; _b < _c.length; _b++) { + var v = _c[_b] + writer.uint32(18).bytes(v) + } + for (var _d = 0, _e = message.configCoreKeys; _d < _e.length; _d++) { + var v = _e[_d] + writer.uint32(26).bytes(v) + } + for (var _f = 0, _g = message.dataCoreKeys; _f < _g.length; _f++) { + var v = _g[_f] + writer.uint32(34).bytes(v) + } + for (var _h = 0, _j = message.blobIndexCoreKeys; _h < _j.length; _h++) { + var v = _j[_h] + writer.uint32(42).bytes(v) + } + for (var _k = 0, _l = message.blobCoreKeys; _k < _l.length; _k++) { + var v = _l[_k] + writer.uint32(50).bytes(v) + } + return writer + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input) + var end = length === undefined ? reader.len : reader.pos + length + var message = createBaseProjectExtension() + while (reader.pos < end) { + var tag = reader.uint32() + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break + } + message.wantCoreKeys.push(reader.bytes()) + continue + case 2: + if (tag !== 18) { + break + } + message.authCoreKeys.push(reader.bytes()) + continue + case 3: + if (tag !== 26) { + break + } + message.configCoreKeys.push(reader.bytes()) + continue + case 4: + if (tag !== 34) { + break + } + message.dataCoreKeys.push(reader.bytes()) + continue + case 5: + if (tag !== 42) { + break + } + message.blobIndexCoreKeys.push(reader.bytes()) + continue + case 6: + if (tag !== 50) { + break + } + message.blobCoreKeys.push(reader.bytes()) + continue + } + if ((tag & 7) === 4 || tag === 0) { + break + } + reader.skipType(tag & 7) + } + return message + }, + create: function (base) { + return ProjectExtension.fromPartial( + base !== null && base !== void 0 ? base : {} + ) + }, + fromPartial: function (object) { + var _a, _b, _c, _d, _e, _f + var message = createBaseProjectExtension() + message.wantCoreKeys = + ((_a = object.wantCoreKeys) === null || _a === void 0 + ? void 0 + : _a.map(function (e) { + return e + })) || [] + message.authCoreKeys = + ((_b = object.authCoreKeys) === null || _b === void 0 + ? void 0 + : _b.map(function (e) { + return e + })) || [] + message.configCoreKeys = + ((_c = object.configCoreKeys) === null || _c === void 0 + ? void 0 + : _c.map(function (e) { + return e + })) || [] + message.dataCoreKeys = + ((_d = object.dataCoreKeys) === null || _d === void 0 + ? void 0 + : _d.map(function (e) { + return e + })) || [] + message.blobIndexCoreKeys = + ((_e = object.blobIndexCoreKeys) === null || _e === void 0 + ? void 0 + : _e.map(function (e) { + return e + })) || [] + message.blobCoreKeys = + ((_f = object.blobCoreKeys) === null || _f === void 0 + ? void 0 + : _f.map(function (e) { + return e + })) || [] + return message + }, +} function createBaseHaveExtension() { - return { - discoveryKey: Buffer.alloc(0), - start: 0, - encodedBitfield: Buffer.alloc(0), - namespace: HaveExtension_Namespace.auth, - }; + return { + discoveryKey: Buffer.alloc(0), + start: 0, + encodedBitfield: Buffer.alloc(0), + namespace: HaveExtension_Namespace.auth, + } } export var HaveExtension = { - encode: function (message, writer) { - if (writer === void 0) { writer = _m0.Writer.create(); } - if (message.discoveryKey.length !== 0) { - writer.uint32(10).bytes(message.discoveryKey); - } - if (message.start !== 0) { - writer.uint32(16).uint64(message.start); - } - if (message.encodedBitfield.length !== 0) { - writer.uint32(26).bytes(message.encodedBitfield); - } - if (message.namespace !== HaveExtension_Namespace.auth) { - writer.uint32(32).int32(haveExtension_NamespaceToNumber(message.namespace)); - } - return writer; - }, - decode: function (input, length) { - var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - var end = length === undefined ? reader.len : reader.pos + length; - var message = createBaseHaveExtension(); - while (reader.pos < end) { - var tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - message.discoveryKey = reader.bytes(); - continue; - case 2: - if (tag !== 16) { - break; - } - message.start = longToNumber(reader.uint64()); - continue; - case 3: - if (tag !== 26) { - break; - } - message.encodedBitfield = reader.bytes(); - continue; - case 4: - if (tag !== 32) { - break; - } - message.namespace = haveExtension_NamespaceFromJSON(reader.int32()); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - create: function (base) { - return HaveExtension.fromPartial(base !== null && base !== void 0 ? base : {}); - }, - fromPartial: function (object) { - var _a, _b, _c, _d; - var message = createBaseHaveExtension(); - message.discoveryKey = (_a = object.discoveryKey) !== null && _a !== void 0 ? _a : Buffer.alloc(0); - message.start = (_b = object.start) !== null && _b !== void 0 ? _b : 0; - message.encodedBitfield = (_c = object.encodedBitfield) !== null && _c !== void 0 ? _c : Buffer.alloc(0); - message.namespace = (_d = object.namespace) !== null && _d !== void 0 ? _d : HaveExtension_Namespace.auth; - return message; - }, -}; -var tsProtoGlobalThis = (function () { - if (typeof globalThis !== "undefined") { - return globalThis; + encode: function (message, writer) { + if (writer === void 0) { + writer = _m0.Writer.create() } - if (typeof self !== "undefined") { - return self; + if (message.discoveryKey.length !== 0) { + writer.uint32(10).bytes(message.discoveryKey) } - if (typeof window !== "undefined") { - return window; + if (message.start !== 0) { + writer.uint32(16).uint64(message.start) } - if (typeof global !== "undefined") { - return global; + if (message.encodedBitfield.length !== 0) { + writer.uint32(26).bytes(message.encodedBitfield) } - throw "Unable to locate global object"; -})(); -function longToNumber(long) { - if (long.gt(Number.MAX_SAFE_INTEGER)) { - throw new tsProtoGlobalThis.Error("Value is larger than Number.MAX_SAFE_INTEGER"); + if (message.namespace !== HaveExtension_Namespace.auth) { + writer + .uint32(32) + .int32(haveExtension_NamespaceToNumber(message.namespace)) } - return long.toNumber(); + return writer + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input) + var end = length === undefined ? reader.len : reader.pos + length + var message = createBaseHaveExtension() + while (reader.pos < end) { + var tag = reader.uint32() + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break + } + message.discoveryKey = reader.bytes() + continue + case 2: + if (tag !== 16) { + break + } + message.start = longToNumber(reader.uint64()) + continue + case 3: + if (tag !== 26) { + break + } + message.encodedBitfield = reader.bytes() + continue + case 4: + if (tag !== 32) { + break + } + message.namespace = haveExtension_NamespaceFromJSON(reader.int32()) + continue + } + if ((tag & 7) === 4 || tag === 0) { + break + } + reader.skipType(tag & 7) + } + return message + }, + create: function (base) { + return HaveExtension.fromPartial( + base !== null && base !== void 0 ? base : {} + ) + }, + fromPartial: function (object) { + var _a, _b, _c, _d + var message = createBaseHaveExtension() + message.discoveryKey = + (_a = object.discoveryKey) !== null && _a !== void 0 + ? _a + : Buffer.alloc(0) + message.start = (_b = object.start) !== null && _b !== void 0 ? _b : 0 + message.encodedBitfield = + (_c = object.encodedBitfield) !== null && _c !== void 0 + ? _c + : Buffer.alloc(0) + message.namespace = + (_d = object.namespace) !== null && _d !== void 0 + ? _d + : HaveExtension_Namespace.auth + return message + }, +} +var tsProtoGlobalThis = (function () { + if (typeof globalThis !== 'undefined') { + return globalThis + } + if (typeof self !== 'undefined') { + return self + } + if (typeof window !== 'undefined') { + return window + } + if (typeof global !== 'undefined') { + return global + } + throw 'Unable to locate global object' +})() +function longToNumber(long) { + if (long.gt(Number.MAX_SAFE_INTEGER)) { + throw new tsProtoGlobalThis.Error( + 'Value is larger than Number.MAX_SAFE_INTEGER' + ) + } + return long.toNumber() } if (_m0.util.Long !== Long) { - _m0.util.Long = Long; - _m0.configure(); + _m0.util.Long = Long + _m0.configure() } diff --git a/src/logger.js b/src/logger.js index 80de7b9db..72d9f5b96 100644 --- a/src/logger.js +++ b/src/logger.js @@ -59,6 +59,7 @@ export class Logger { static create(ns, logger) { if (logger) return logger.extend(ns) const i = (counts.get(ns) || 0) + 1 + counts.set(ns, i) const deviceId = String(i).padStart(TRIM, '0') return new Logger({ deviceId, ns }) } diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 335af9bdf..480ef0e02 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -158,6 +158,7 @@ export class MapeoProject { namespace: 'auth', batch: (entries) => indexWriter.batch(entries), storage: indexerStorage, + emitUpdates: ['role', 'coreOwnership'], }), config: new DataStore({ coreManager: this.#coreManager, @@ -283,6 +284,7 @@ export class MapeoProject { this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, + coreOwnership: this.#coreOwnership, logger: this.#l, }) diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index d1d55aa09..7dbcdc663 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -1,4 +1,3 @@ -import mapObject from 'map-obj' import { NAMESPACES } from '../core-manager/index.js' import { Logger } from '../logger.js' @@ -13,58 +12,78 @@ import { Logger } from '../logger.js' export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] export class PeerSyncController { + /** @type {Set>} */ #replicatingCores = new Set() /** @type {Set} */ #enabledNamespaces = new Set() #coreManager - #protomux + #creatorCorePeer #capabilities /** @type {Record} */ #syncCapability = createNamespaceMap('unknown') #isDataSyncEnabled = false - /** @type {Record} */ - #prevLocalState = createNamespaceMap(null) - /** @type {SyncStatus} */ - #syncStatus = createNamespaceMap('unknown') - /** @type {Map, ReturnType>} */ - #downloadingRanges = new Map() - /** @type {SyncStatus} */ - #prevSyncStatus = createNamespaceMap('unknown') + #hasSentHaves = createNamespaceMap(false) #log + #syncState + #presyncDone = false /** * @param {object} opts - * @param {import("protomux")} opts.protomux + * @param {import('./sync-api.js').Peer} opts.creatorCorePeer * @param {import("../core-manager/index.js").CoreManager} opts.coreManager * @param {import("./sync-state.js").SyncState} opts.syncState * @param {import("../capabilities.js").Capabilities} opts.capabilities * @param {Logger} [opts.logger] */ - constructor({ protomux, coreManager, syncState, capabilities, logger }) { + constructor({ + creatorCorePeer, + coreManager, + syncState, + capabilities, + logger, + }) { // @ts-ignore this.#log = (formatter, ...args) => { const log = Logger.create('peer', logger).log return log.apply(null, [ `[%h] ${formatter}`, - protomux.stream.remotePublicKey, + creatorCorePeer.remotePublicKey, ...args, ]) } this.#coreManager = coreManager - this.#protomux = protomux + this.#creatorCorePeer = creatorCorePeer this.#capabilities = capabilities + this.#syncState = syncState - // Always need to replicate the project creator core - this.#replicateCore(coreManager.creatorCore) + // The creator core is replicating before this instance is created + this.#replicatingCores = new Set([coreManager.creatorCore]) + + // A PeerSyncController instance is only created once the creator cores are + // replicating, which imeans that the peer has the project key, so now we + // can send all the auth core keys. + // + // We could reduce network traffic by delaying sending this until we see + // which keys the peer already has, so that we only send the keys they are + // missing. However the network traffic cost of sending keys is low (it's 8 + // bytes * number of devices in a project) vs. the delay in sync e.g. if the + // delay is more than the time it takes to share the keys, it's not worth + // it. + coreManager.sendAuthCoreKeys(creatorCorePeer) coreManager.on('add-core', this.#handleAddCore) syncState.on('state', this.#handleStateChange) + capabilities.on('update', this.#handleCapabilitiesUpdate) this.#updateEnabledNamespaces() } + get protomux() { + return this.#creatorCorePeer.protomux + } + get peerKey() { - return this.#protomux.stream.remotePublicKey + return this.#creatorCorePeer.remotePublicKey } get peerId() { @@ -99,24 +118,24 @@ export class PeerSyncController { */ handleDiscoveryKey(discoveryKey) { const coreRecord = this.#coreManager.getCoreByDiscoveryKey(discoveryKey) - // If we already know about this core, then we will add it to the - // replication stream when we are ready - if (coreRecord) { - this.#log( - 'Received discovery key %h, but already have core in namespace %s', - discoveryKey, - coreRecord.namespace - ) - if (this.#enabledNamespaces.has(coreRecord.namespace)) { - this.#replicateCore(coreRecord.core) - } - return - } - if (!this.peerKey) { - this.#log('Unexpected null peerKey') - return + // If we don't have the core record, we'll add and replicate it when we + // receive the core key via an extension or from a core ownership record. + if (!coreRecord) return + + this.#log( + 'Received discovery key %h, but already have core in namespace %s', + discoveryKey, + coreRecord.namespace + ) + if (this.#enabledNamespaces.has(coreRecord.namespace)) { + this.#replicateCore(coreRecord.core) } - this.#coreManager.requestCoreKey(this.peerKey, discoveryKey) + } + + destroy() { + this.#coreManager.off('add-core', this.#handleAddCore) + this.#syncState.off('state', this.#handleStateChange) + this.#capabilities.off('update', this.#handleCapabilitiesUpdate) } /** @@ -137,48 +156,54 @@ export class PeerSyncController { * @param {import("./sync-state.js").State} state */ #handleStateChange = async (state) => { - // The remotePublicKey is only available after the noise stream has - // connected. We shouldn't get a state change before the noise stream has - // connected, but if we do we can ignore it because we won't have any useful - // information until it connects. - if (!this.peerId) return - this.#syncStatus = getSyncStatus(this.peerId, state) - const localState = mapObject(state, (ns, nsState) => { - return [ns, nsState.localState] - }) - this.#log('state %X', state) - - // Map of which namespaces have received new data since last sync change - const didUpdate = mapObject(state, (ns) => { - const nsDidSync = - this.#prevSyncStatus[ns] !== 'synced' && - this.#syncStatus[ns] === 'synced' - const prevNsState = this.#prevLocalState[ns] - const nsDidUpdate = - nsDidSync && - (prevNsState === null || prevNsState.have !== localState[ns].have) - if (nsDidUpdate) { - this.#prevLocalState[ns] = localState[ns] - } - return [ns, nsDidUpdate] + if (this.#presyncDone) return + const syncStatus = getSyncStatus(this.peerId, state) + this.#presyncDone = PRESYNC_NAMESPACES.every((ns) => { + return syncStatus[ns] === 'synced' }) - this.#prevSyncStatus = this.#syncStatus - - if (didUpdate.auth) { - try { - const cap = await this.#capabilities.getCapabilities(this.peerId) - this.#syncCapability = cap.sync - } catch (e) { - this.#log('Error reading capability', e) - // Any error, consider sync unknown - this.#syncCapability = createNamespaceMap('unknown') - } + if (!this.#presyncDone) return + this.#log('Pre-sync done') + // Once pre-sync is done, if data sync is enabled and the peer has the + // correct capabilities, then we will enable sync of data namespaces + this.#updateEnabledNamespaces() + } + + /** + * Handler for capabilities being updated. If they have changed for this peer + * then we update enabled namespaces and send pre-haves for any namespaces + * authorized for sync + * + * @param {import('@mapeo/schema').Role[]} docs + */ + #handleCapabilitiesUpdate = async (docs) => { + const peerRoleUpdated = docs.some((doc) => doc.docId === this.peerId) + if (!peerRoleUpdated) return + const prevSyncCapability = this.#syncCapability + try { + const cap = await this.#capabilities.getCapabilities(this.peerId) + this.#syncCapability = cap.sync + } catch (e) { + this.#log('Error reading capability', e) + // Any error, consider sync unknown + this.#syncCapability = createNamespaceMap('unknown') } - this.#log('capability %o', this.#syncCapability) + const syncCapabilityChanged = !shallowEqual( + prevSyncCapability, + this.#syncCapability + ) + if (!syncCapabilityChanged) return + + this.#log('Sync capability changed %o', this.#syncCapability) + this.#updateEnabledNamespaces() - // If any namespace has new data, update what is enabled - if (Object.values(didUpdate).indexOf(true) > -1) { - this.#updateEnabledNamespaces() + // Send pre-haves for any namespaces that the peer is allowed to sync + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + if (this.#hasSentHaves[ns]) continue + if (this.#syncCapability[ns] !== 'allowed') continue + this.#coreManager.sendHaves(this.#creatorCorePeer, ns) + this.#log('Sent pre-haves for %s', ns) + this.#hasSentHaves[ns] = true } } @@ -200,14 +225,9 @@ export class PeerSyncController { } else if (cap === 'allowed') { if (PRESYNC_NAMESPACES.includes(ns)) { this.#enableNamespace(ns) - } else if (this.#isDataSyncEnabled) { - const arePresyncNamespacesSynced = PRESYNC_NAMESPACES.every( - (ns) => this.#syncStatus[ns] === 'synced' - ) + } else if (this.#isDataSyncEnabled && this.#presyncDone) { // Only enable data namespaces once the pre-sync namespaces have synced - if (arePresyncNamespacesSynced) { - this.#enableNamespace(ns) - } + this.#enableNamespace(ns) } else { this.#disableNamespace(ns) } @@ -225,7 +245,7 @@ export class PeerSyncController { #replicateCore(core) { if (this.#replicatingCores.has(core)) return this.#log('replicating core %k', core.key) - core.replicate(this.#protomux) + core.replicate(this.#creatorCorePeer.protomux) core.on('peer-remove', (peer) => { if (!peer.remotePublicKey.equals(this.peerKey)) return this.#log('peer-remove %h from core %k', peer.remotePublicKey, core.key) @@ -239,7 +259,7 @@ export class PeerSyncController { #unreplicateCore(core) { if (core === this.#coreManager.creatorCore) return const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === this.#protomux + (peer) => peer.protomux === this.#creatorCorePeer.protomux ) if (!peerToUnreplicate) return this.#log('unreplicating core %k', core.key) @@ -312,3 +332,17 @@ function createNamespaceMap(value) { } return map } + +/** + * Very naive shallow equal, but all we need for comparing sync capabilities + * + * @param {Record} a + * @param {Record} b + * @returns + */ +function shallowEqual(a, b) { + for (const key of Object.keys(a)) { + if (a[key] !== b[key]) return false + } + return true +} diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 018de6e9b..c9f93ff42 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -6,10 +6,16 @@ import { } from './peer-sync-controller.js' import { Logger } from '../logger.js' import { NAMESPACES } from '../core-manager/index.js' -import { keyToId } from '../utils.js' export const kHandleDiscoveryKey = Symbol('handle discovery key') +/** + * @typedef {{ + * protomux: import('protomux'), + * remotePublicKey: Buffer + * }} Peer + */ + /** * @typedef {object} SyncEvents * @property {(syncState: import('./sync-state.js').State) => void} sync-state @@ -22,40 +28,48 @@ export class SyncApi extends TypedEmitter { syncState #coreManager #capabilities - /** @type {Map} */ - #peerSyncControllers = new Map() - /** @type {Set} */ - #peerIds = new Set() + #PSCIndex = new PSCIndex() /** @type {Set<'local' | 'remote'>} */ #dataSyncEnabled = new Set() /** @type {Map>} */ #pendingDiscoveryKeys = new Map() #l + #coreOwnership /** * * @param {object} opts * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {import('../core-ownership.js').CoreOwnership} opts.coreOwnership * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ - constructor({ coreManager, throttleMs = 200, capabilities, logger }) { + constructor({ + coreManager, + capabilities, + coreOwnership, + throttleMs = 200, + logger, + }) { super() this.#l = Logger.create('syncApi', logger) this.#coreManager = coreManager this.#capabilities = capabilities + this.#coreOwnership = coreOwnership this.syncState = new SyncState({ coreManager, throttleMs }) this.syncState.setMaxListeners(0) this.syncState.on('state', this.emit.bind(this, 'sync-state')) this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + capabilities.on('update', this.#handleRoleUdpate.bind(this)) + coreOwnership.on('update', this.#handleCoreOwnershipUpdate.bind(this)) } /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ [kHandleDiscoveryKey](discoveryKey, protomux) { - const peerSyncController = this.#peerSyncControllers.get(protomux) + const peerSyncController = this.#PSCIndex.getByProtomux(protomux) if (peerSyncController) { peerSyncController.handleDiscoveryKey(discoveryKey) return @@ -92,7 +106,7 @@ export class SyncApi extends TypedEmitter { if (this.#dataSyncEnabled.has('local')) return this.#dataSyncEnabled.add('local') this.#l.log('Starting data sync') - for (const peerSyncController of this.#peerSyncControllers.values()) { + for (const peerSyncController of this.#PSCIndex.values()) { peerSyncController.enableDataSync() } } @@ -104,7 +118,7 @@ export class SyncApi extends TypedEmitter { if (!this.#dataSyncEnabled.has('local')) return this.#dataSyncEnabled.delete('local') this.#l.log('Stopping data sync') - for (const peerSyncController of this.#peerSyncControllers.values()) { + for (const peerSyncController of this.#PSCIndex.values()) { peerSyncController.disableDataSync() } } @@ -115,10 +129,10 @@ export class SyncApi extends TypedEmitter { async waitForSync(type) { const state = this.getState() const namespaces = type === 'initial' ? PRESYNC_NAMESPACES : NAMESPACES - if (isSynced(state, namespaces, this.#peerSyncControllers)) return + if (isSynced(state, namespaces, this.#PSCIndex.values())) return return new Promise((res) => { this.on('sync-state', function onState(state) { - if (!isSynced(state, namespaces, this.#peerSyncControllers)) return + if (!isSynced(state, namespaces, this.#PSCIndex.values())) return this.off('sync-state', onState) res(null) }) @@ -133,26 +147,25 @@ export class SyncApi extends TypedEmitter { * will then handle validation of role records to ensure that the peer is * actually still part of the project. * - * @param {{ protomux: import('protomux') }} peer + * @param {Peer} peer */ #handlePeerAdd = (peer) => { const { protomux } = peer - if (this.#peerSyncControllers.has(protomux)) { + if (this.#PSCIndex.hasProtomux(protomux)) { this.#l.log( 'Unexpected existing peer sync controller for peer %h', - protomux.stream.remotePublicKey + peer.remotePublicKey ) return } const peerSyncController = new PeerSyncController({ - protomux, + creatorCorePeer: peer, coreManager: this.#coreManager, syncState: this.syncState, capabilities: this.#capabilities, logger: this.#l, }) - this.#peerSyncControllers.set(protomux, peerSyncController) - if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId) + this.#PSCIndex.add(peerSyncController) if (this.#dataSyncEnabled.has('local')) { peerSyncController.enableDataSync() @@ -173,21 +186,75 @@ export class SyncApi extends TypedEmitter { * Called when a peer is removed from the creator core, e.g. when the * connection is terminated. * - * @param {{ protomux: import('protomux'), remotePublicKey: Buffer }} peer + * @param {Peer} peer */ #handlePeerRemove = (peer) => { const { protomux } = peer - if (!this.#peerSyncControllers.has(protomux)) { + const psc = this.#PSCIndex.getByProtomux(protomux) + if (!psc) { this.#l.log( 'Unexpected no existing peer sync controller for peer %h', protomux.stream.remotePublicKey ) return } - this.#peerSyncControllers.delete(protomux) - this.#peerIds.delete(keyToId(peer.remotePublicKey)) + psc.destroy() + this.#PSCIndex.delete(psc) this.#pendingDiscoveryKeys.delete(protomux) } + + /** + * @param {import('@mapeo/schema').Role[]} roleDocs + */ + async #handleRoleUdpate(roleDocs) { + /** @type {Set} */ + const updatedDeviceIds = new Set() + for (const doc of roleDocs) { + // Ignore docs about ourselves + if (doc.docId === this.#coreManager.deviceId) continue + updatedDeviceIds.add(doc.docId) + } + const coreOwnershipPromises = [] + for (const deviceId of updatedDeviceIds) { + coreOwnershipPromises.push(this.#coreOwnership.get(deviceId)) + } + const ownershipResults = await Promise.allSettled(coreOwnershipPromises) + for (const result of ownershipResults) { + if (result.status === 'rejected') continue + this.#addCores(result.value) + this.#l.log('Added cores for device %S', result.value.docId) + } + } + + /** + * @param {import('@mapeo/schema').CoreOwnership[]} coreOwnershipDocs + */ + async #handleCoreOwnershipUpdate(coreOwnershipDocs) { + for (const coreOwnershipDoc of coreOwnershipDocs) { + // Ignore our own ownership doc - we don't need to add cores for ourselves + if (coreOwnershipDoc.docId === this.#coreManager.deviceId) continue + try { + // We don't actually need the role, we just need to check if it exists + await this.#capabilities.getCapabilities(coreOwnershipDoc.docId) + this.#addCores(coreOwnershipDoc) + this.#l.log('Added cores for device %S', coreOwnershipDoc.docId) + } catch (e) { + // Ignore, we'll add these when the role is added + this.#l.log('No role for device %S', coreOwnershipDoc.docId) + } + } + } + + /** + * @param {import('@mapeo/schema').CoreOwnership} coreOwnership + */ + #addCores(coreOwnership) { + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + const coreKey = Buffer.from(coreOwnership[`${ns}CoreId`], 'hex') + this.#coreManager.addCore(coreKey, ns) + } + } } /** @@ -195,12 +262,12 @@ export class SyncApi extends TypedEmitter { * * @param {import('./sync-state.js').State} state * @param {readonly import('../core-manager/index.js').Namespace[]} namespaces - * @param {Map} peerSyncControllers + * @param {Iterable} peerSyncControllers */ function isSynced(state, namespaces, peerSyncControllers) { for (const ns of namespaces) { if (state[ns].dataToSync) return false - for (const psc of peerSyncControllers.values()) { + for (const psc of peerSyncControllers) { const { peerId } = psc if (psc.syncCapability[ns] === 'blocked') continue if (!(peerId in state[ns].remoteStates)) return false @@ -209,3 +276,52 @@ function isSynced(state, namespaces, peerSyncControllers) { } return true } + +class PSCIndex { + /** @type {Map} */ + #byProtomux = new Map() + /** @type {Map>} */ + #byPeerId = new Map() + /** + * @param {PeerSyncController} psc + */ + add(psc) { + this.#byProtomux.set(psc.protomux, psc) + const peerSet = this.#byPeerId.get(psc.peerId) || new Set() + peerSet.add(psc) + this.#byPeerId.set(psc.peerId, peerSet) + } + values() { + return this.#byProtomux.values() + } + /** + * @param {import('protomux')} protomux + */ + hasProtomux(protomux) { + return this.#byProtomux.has(protomux) + } + /** + * @param {import('protomux')} protomux + */ + getByProtomux(protomux) { + return this.#byProtomux.get(protomux) + } + /** + * @param {string} peerId + */ + getByPeerId(peerId) { + return this.#byPeerId.get(peerId) + } + /** + * @param {PeerSyncController} psc + */ + delete(psc) { + this.#byProtomux.delete(psc.protomux) + const peerSet = this.#byPeerId.get(psc.peerId) + if (!peerSet) return + peerSet.delete(psc) + if (peerSet.size === 0) { + this.#byPeerId.delete(psc.peerId) + } + } +} diff --git a/test-e2e/sync.js b/test-e2e/sync.js index ffd9fe6f0..3fd1ada5f 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -190,71 +190,65 @@ test('shares cores', async function (t) { } }) -test('no sync capabilities === no namespaces sync apart from auth', async (t) => { - const COUNT = 3 - const managers = await createManagers(COUNT, t) - const [invitor, invitee, blocked] = managers - const disconnect1 = connectPeers(managers, { discovery: false }) - const projectId = await invitor.createProject() - await invite({ - invitor, - invitees: [blocked], - projectId, - roleId: BLOCKED_ROLE_ID, - }) - await invite({ - invitor, - invitees: [invitee], - projectId, - roleId: COORDINATOR_ROLE_ID, - }) - - const projects = await Promise.all( - managers.map((m) => m.getProject(projectId)) - ) - const [invitorProject, inviteeProject] = projects +test.solo( + 'no sync capabilities === no namespaces sync apart from auth', + async (t) => { + const COUNT = 3 + const managers = await createManagers(COUNT, t) + const [invitor, invitee, blocked] = managers + const disconnect1 = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ + invitor, + invitees: [blocked], + projectId, + roleId: BLOCKED_ROLE_ID, + }) + await invite({ + invitor, + invitees: [invitee], + projectId, + roleId: COORDINATOR_ROLE_ID, + }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const generatedDocs = (await seedDatabases([inviteeProject])).flat() + const configDocsCount = generatedDocs.filter( + (doc) => doc.schemaName !== 'observation' + ).length + const dataDocsCount = generatedDocs.length - configDocsCount - const generatedDocs = (await seedDatabases([inviteeProject])).flat() - const configDocsCount = generatedDocs.filter( - (doc) => doc.schemaName !== 'observation' - ).length - const dataDocsCount = generatedDocs.length - configDocsCount + for (const project of projects) { + project.$sync.start() + } - for (const project of projects) { - project.$sync.start() - } + await waitForSync([inviteeProject, invitorProject], 'full') - await waitForSync([inviteeProject, invitorProject], 'full') + const [invitorState, inviteeState, blockedState] = projects.map((p) => + p.$sync.getState() + ) - const [invitorState, inviteeState, blockedState] = projects.map((p) => - p.$sync.getState() - ) + t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device + t.is(invitorState.data.localState.have, dataDocsCount) + console.dir(blockedState, { depth: null }) + t.is(blockedState.config.localState.have, 1) // just the device info doc + t.is(blockedState.data.localState.have, 0) // no data docs synced - t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device - t.is(invitorState.data.localState.have, dataDocsCount) - t.is(blockedState.config.localState.have, 1) // just the device info doc - t.is(blockedState.data.localState.have, 0) // no data docs synced + // Temp fix until we have .close() method - waits for indexing idle to ensure + // we don't close storage in teardown while index is still being written. + await Promise.all(projects.map((p) => p.$getProjectSettings())) - for (const ns of NAMESPACES) { - if (ns === 'auth') { - t.is(invitorState[ns].coreCount, 3) - t.is(inviteeState[ns].coreCount, 3) - t.is(blockedState[ns].coreCount, 3) - } else if (PRESYNC_NAMESPACES.includes(ns)) { + for (const ns of NAMESPACES) { t.is(invitorState[ns].coreCount, 3) t.is(inviteeState[ns].coreCount, 3) - t.is(blockedState[ns].coreCount, 1) - } else { - t.is(invitorState[ns].coreCount, 2) - t.is(inviteeState[ns].coreCount, 2) - t.is(blockedState[ns].coreCount, 1) + t.is(blockedState[ns].coreCount, 3, ns) + t.alike(invitorState[ns].localState, inviteeState[ns].localState) } - t.alike(invitorState[ns].localState, inviteeState[ns].localState) - } - - // Temp fix until we have .close() method - waits for indexing idle to ensure - // we don't close storage in teardown while index is still being written. - await Promise.all(projects.map((p) => p.$getProjectSettings())) - await disconnect1() -}) + await disconnect1() + } +) diff --git a/tests/datastore.js b/tests/datastore.js index cfde7a65f..e652a03cd 100644 --- a/tests/datastore.js +++ b/tests/datastore.js @@ -91,12 +91,12 @@ test('index events', async (t) => { }, storage: () => new RAM(), }) - dataStore.on('index-state', (state) => { + dataStore.indexer.on('index-state', (state) => { // eslint-disable-next-line no-unused-vars const { entriesPerSecond, ...rest } = state indexStates.push(rest) }) - const idlePromise = once(dataStore, 'idle') + const idlePromise = once(dataStore.indexer, 'idle') await dataStore.write(obs) await idlePromise const expectedStates = [