Skip to content

Commit

Permalink
fix: wait for index idle before returning data (#389)
Browse files Browse the repository at this point in the history
* WIP initial work

* rename Rpc to LocalPeers

* Handle deviceInfo internally, id -> deviceId

* Tests for stream error handling

* remove unnecessary constructor

* return replication stream

* Attach protomux instance to peer info

* rename and re-organize

* revert changes outside scope of PR

* WIP initial work

* Tie everything together

* rename getProjectInstance

* feat: client.listLocalPeers() & `local-peers` evt

* feat: add $sync API methods

For now this simplifies the API (because we are only supporting local
sync, not remote sync over the internet) to:

- `project.$sync.getState()`
- `project.$sync.start()`
- `project.$sync.stop()`
- Events
    - `sync-state`

It's currently not possible to stop local discovery, nor is it possible
to stop sync of the metadata namespaces (auth, config, blobIndex). The
start and stop methods stop the sync of the data and blob namespaces.

Fixes #134. Stacked on #360, #358 and #356.

* feat: Add project.$waitForInitialSync() method

Fixes Add project method to download auth + config cores #233

Rather than call this inside the `client.addProject()` method, instead I
think it is better for the API consumer to call
`project.$waitForInitialSync()` after adding a project, since this
allows the implementer to give user feedback about what is happening.

* Wait for initial sync within  addProject()

* fix: don't add core bitfield until core is ready

* feat: expose deviceId on coreManager

* fix: wait for project.ready() in waitForInitialSync

* fix: skip waitForSync in tests

* don't enable/disable namespace if not needed

* start core download when created via sparse: false

* Add debug logging

This was a big lift, but necessary to be able to debug sync issues since
temporarily adding console.log statements was too much work, and
debugging requires knowing the deviceId associated with each message.

* fix timeout

* fix: Add new cores to the indexer (!!!)

This caused a day of work: a bug from months back

* remove unnecessary log stmt

* get capabilities.getMany() to include creator

* fix invite test

* keep blob cores sparse

* optional param for LocalPeers

* re-org sync and replication

Removes old replication code attached to CoreManager
Still needs tests to be updated

* update package-lock

* chore: Add debug logging

* Add new logger to discovery + dnssd

* Get invite test working

* fix manager logger

* cleanup invite test (and make it fail :(

* fix: handle duplicate connections to LocalPeers

* fix stream close before channel open

* send invite to non-existent peer

* fixed fake timers implementation for tests

* new tests for duplicate connections

* cleanup and small fix

* Better state debug logging

* chain of invites test

* fix max listeners and add skipped test

* fix: only request a core key from one peer

Reduces the number of duplicate requests for the same keys.

* cleanup members tests with new helprs

* wait for project ready when adding

* only create 4 clients for chain of invites test

* add e2e sync tests

* add published @mapeo/mock-data

* fix: don't open cores in sparse mode

Turns out this changes how core.length etc. work, which confuses things

* fix: option to skip auto download for tests

* e2e test for stop-start sync

* fix coreManager unit tests

* fix blob store tests

* fix discovery-key event

* add coreCount to sync state

* test sync with blocked peer & fix bugs

* fix datatype unit tests

* fix blobs server unit tests

* remote peer-sync-controller unit test

This is now tested in e2e tests

* fix type issues caused by bad lockfile

* ignore debug type errors

* fixes for review comments

* move utils-new into utils

* Add debug info to test that sometimes fails

* Update package-lock.json version

* remove project.ready() (breaks things)

* wait for coreOwnership write before returning

* use file storage in tests (breaks things)

* Catch race condition in CRUD tests

* fix race condition with parallel writes

* fix tests for new createManagers syntax

* fix flakey test

This test relied on `peer.connectedAt` changing in order to distinguish
connections, but sometimes `connectedAt` was the same for both peers.
This adds a 1ms delay before making the second connection, to attempt to
stop the flakiness.

* fix: wait for index idle before returning data

* temp fixes to run CI

* small fix for failing test

* update to published multi-core-indexer

---------

Co-authored-by: Andrew Chou <[email protected]>
  • Loading branch information
gmaclennan and achou11 authored Nov 28, 2023
1 parent 33d0f20 commit faecad2
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 16 deletions.
9 changes: 4 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"hyperswarm": "4.4.1",
"magic-bytes.js": "^1.0.14",
"map-obj": "^5.0.2",
"multi-core-indexer": "1.0.0-alpha.7",
"multi-core-indexer": "^1.0.0-alpha.9",
"p-defer": "^4.0.0",
"p-timeout": "^6.1.2",
"patch-package": "^8.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/core-ownership.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class CoreOwnership {
expressions.push(eq(table[`${namespace}CoreId`], coreId))
}
// prettier-ignore
const result = this.#dataType[kSelect]()
const result = (await this.#dataType[kSelect]())
.where(or.apply(null, expressions))
.get()
if (!result) {
Expand Down
4 changes: 4 additions & 0 deletions src/datastore/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ export class DataStore extends TypedEmitter {
})
}

get indexer() {
return this.#coreIndexer
}

get namespace() {
return this.#namespace
}
Expand Down
7 changes: 6 additions & 1 deletion src/datatype/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export class DataType {
* @param {string} docId
*/
async getByDocId(docId) {
await this.#dataStore.indexer.idle()
const result = this.#sql.getByDocId.get({ docId })
if (!result) throw new Error('Not found')
return deNullify(result)
Expand All @@ -151,6 +152,7 @@ export class DataType {
}

async getMany() {
await this.#dataStore.indexer.idle()
return this.#sql.getMany.all().map((doc) => deNullify(doc))
}

Expand All @@ -161,6 +163,7 @@ export class DataType {
* @param {T} value
*/
async update(versionId, value) {
await this.#dataStore.indexer.idle()
const links = Array.isArray(versionId) ? versionId : [versionId]
const { docId, createdAt, createdBy } = await this.#validateLinks(links)
/** @type {any} */
Expand All @@ -181,6 +184,7 @@ export class DataType {
* @param {string | string[]} versionId
*/
async delete(versionId) {
await this.#dataStore.indexer.idle()
const links = Array.isArray(versionId) ? versionId : [versionId]
const { docId, createdAt, createdBy } = await this.#validateLinks(links)
/** @type {any} */
Expand All @@ -200,7 +204,8 @@ export class DataType {
/**
* @param {Parameters<import('drizzle-orm/better-sqlite3').BetterSQLite3Database['select']>[0]} fields
*/
[kSelect](fields) {
async [kSelect](fields) {
await this.#dataStore.indexer.idle()
return this.#db.select(fields).from(this.#table)
}

Expand Down
4 changes: 3 additions & 1 deletion test-e2e/manager-basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ test('Consistent storage folders', async (t) => {
},
{ waitForSync: false }
)
await manager.getProject(projectId)
const project = await manager.getProject(projectId)
// awaiting this ensures that indexing is done, which means that indexer storage is created
await project.$getOwnCapabilities()
}

// @ts-ignore snapshot() is missing from typedefs
Expand Down
8 changes: 6 additions & 2 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/capabilities.js'

const SCHEMAS_INITIAL_SYNC = ['preset', 'field']

test('Create and sync data', async function (t) {
const COUNT = 5
test('Create and sync data', { timeout: 100_000 }, async function (t) {
const COUNT = 10
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
Expand Down Expand Up @@ -252,5 +252,9 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>
t.alike(invitorState[ns].localState, inviteeState[ns].localState)
}

// Temp fix until we have .close() method - waits for indexing idle to ensure
// we don't close storage in teardown while index is still being written.
await Promise.all(projects.map((p) => p.$getProjectSettings()))

await disconnect1()
})
2 changes: 1 addition & 1 deletion test-e2e/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { randomInt } from 'node:crypto'
import { temporaryDirectory } from 'tempy'
import fsPromises from 'node:fs/promises'

const FAST_TESTS = true
const FAST_TESTS = !!process.env.FAST_TESTS
const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url)
.pathname
const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url)
Expand Down
4 changes: 0 additions & 4 deletions tests/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ test('index events', async (t) => {
await dataStore.write(obs)
await idlePromise
const expectedStates = [
{
current: 'idle',
remaining: 0,
},
{
current: 'indexing',
remaining: 1,
Expand Down

0 comments on commit faecad2

Please sign in to comment.