Skip to content

Commit

Permalink
feat: add project.close() (#375)
Browse files Browse the repository at this point in the history
* initial implementation of project.close()

for now:
  close coreManager cores for every namespace

* add `close` to Datastore, use `coreManager.close()`

the close method of `Datastore` basically `.removeAllListeners()` and
`close()` over the `#coreIndexer` instance. I don't if that should be
enough? But looking at the class, it seems so...

* add sqlite as a private field, `.close()` it on `MapeoProject.close()`

* close dataStores in parallel

* await #coreManager.close and dataStore promises

* update multi-core-indexer to alpha8

* update lock

* fix package-lock, add tests to close

* fix `.getMany` test for `project.close()`

* add tests for creating project after `project.close()`

* * remove 'multiCoreIndexer.removeAllListener()' (the class is already
  removing added listeners)
* comment crud test regarding closing and re-opening a project

* remove cached project in manager after closing project

* remove added listeners in mapeo project after close

* [OPTIC-RELEASE-AUTOMATION] release/v9.0.0-alpha.3 (#404)

Release v9.0.0-alpha.3

Co-authored-by: achou11 <[email protected]>

* Revert "[OPTIC-RELEASE-AUTOMATION] release/v9.0.0-alpha.3 (#404)"

This reverts commit a1b742e.

* update flaky sync e2e test now that project.close() is implemented

* const instead of let in close() method

* fix: close cores after indexing is closed

---------

Co-authored-by: Tomás Ciccola <[email protected]>
Co-authored-by: Andrew Chou <[email protected]>
Co-authored-by: optic-release-automation[bot] <94357573+optic-release-automation[bot]@users.noreply.github.com>
Co-authored-by: achou11 <[email protected]>
Co-authored-by: Gregor MacLennan <[email protected]>
  • Loading branch information
6 people authored Dec 7, 2023
1 parent e3695da commit 06a4335
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 14 deletions.
5 changes: 4 additions & 1 deletion src/datastore/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class DataStore extends TypedEmitter {
return decode(block, { coreDiscoveryKey, index })
}

/** @param {Buffer} buf} */
/** @param {Buffer} buf */
async writeRaw(buf) {
const { length } = await this.#writerCore.append(buf)
const index = length - 1
Expand All @@ -219,6 +219,9 @@ export class DataStore extends TypedEmitter {
return block
}

async close() {
await this.#coreIndexer.close()
}
#handleIndexerIdle = () => {
for (const eventName of this.eventNames()) {
if (!(eventName in this.#pendingEmits)) continue
Expand Down
8 changes: 8 additions & 0 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ export class MapeoManager extends TypedEmitter {
projectSecretKey: projectKeypair.secretKey,
})

project.once('close', () => {
this.#activeProjects.delete(projectPublicId)
})

// 5. Write project name and any other relevant metadata to project instance
await project.$setProjectSettings(settings)

Expand Down Expand Up @@ -369,6 +373,10 @@ export class MapeoManager extends TypedEmitter {

const project = this.#createProjectInstance(projectKeys)

project.once('close', () => {
this.#activeProjects.delete(projectPublicId)
})

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

Expand Down
67 changes: 57 additions & 10 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { decodeBlockPrefix } from '@mapeo/schema'
import { drizzle } from 'drizzle-orm/better-sqlite3'
import { migrate } from 'drizzle-orm/better-sqlite3/migrator'
import { discoveryKey } from 'hypercore-crypto'
import { TypedEmitter } from 'tiny-typed-emitter'

import { CoreManager, NAMESPACES } from './core-manager/index.js'
import { DataStore } from './datastore/index.js'
Expand Down Expand Up @@ -51,7 +52,10 @@ export const kBlobStore = Symbol('blobStore')
export const kProjectReplicate = Symbol('replicate project')
const EMPTY_PROJECT_SETTINGS = Object.freeze({})

export class MapeoProject {
/**
* @extends {TypedEmitter<{ close: () => void }>}
*/
export class MapeoProject extends TypedEmitter {
#projectId
#deviceId
#coreManager
Expand All @@ -60,6 +64,9 @@ export class MapeoProject {
#blobStore
#coreOwnership
#capabilities
/** @ts-ignore */
#ownershipWriteDone
#sqlite
#memberApi
#iconApi
#syncApi
Expand Down Expand Up @@ -97,13 +104,15 @@ export class MapeoProject {
localPeers,
logger,
}) {
super()

this.#l = Logger.create('project', logger)
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)

///////// 1. Setup database
const sqlite = new Database(dbPath)
const db = drizzle(sqlite)
this.#sqlite = new Database(dbPath)
const db = drizzle(this.#sqlite)
migrate(db, { migrationsFolder: projectMigrationsFolder })

///////// 2. Setup random-access-storage functions
Expand All @@ -124,7 +133,7 @@ export class MapeoProject {
projectKey,
keyManager,
storage: coreManagerStorage,
sqlite,
sqlite: this.#sqlite,
logger: this.#l,
})

Expand All @@ -138,7 +147,7 @@ export class MapeoProject {
deviceInfoTable,
iconTable,
],
sqlite,
sqlite: this.#sqlite,
getWinner,
mapDoc: (doc, version) => {
switch (doc.schemaName) {
Expand Down Expand Up @@ -294,17 +303,32 @@ export class MapeoProject {
this.#coreManager.creatorCore.replicate(peer.protomux)
}

/**
* @type {import('./local-peers.js').LocalPeersEvents['peer-add']}
*/
const onPeerAdd = (peer) => {
this.#coreManager.creatorCore.replicate(peer.protomux)
}

/**
* @type {import('./local-peers.js').LocalPeersEvents['discovery-key']}
*/
const onDiscoverykey = (discoveryKey, stream) => {
this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream)
}

// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#coreManager.creatorCore.replicate(peer.protomux)
})
localPeers.on('peer-add', onPeerAdd)

// This happens whenever a peer replicates a core to the stream. SyncApi
// handles replicating this core if we also have it, or requesting the key
// for the core.
localPeers.on('discovery-key', (discoveryKey, stream) => {
this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream)
localPeers.on('discovery-key', onDiscoverykey)

this.once('close', () => {
localPeers.off('peer-add', onPeerAdd)
localPeers.off('discovery-key', onDiscoverykey)
})

this.#l.log('Created project instance %h', projectKey)
Expand Down Expand Up @@ -339,6 +363,29 @@ export class MapeoProject {
return this.#deviceId
}

/**
* Resolves when hypercores have all loaded
*/
async ready() {
await Promise.all([this.#coreManager.ready(), this.#ownershipWriteDone])
}

/**
*/
async close() {
this.#l.log('closing project %h', this.#projectId)
const dataStorePromises = []
for (const dataStore of Object.values(this.#dataStores)) {
dataStorePromises.push(dataStore.close())
}
await Promise.all(dataStorePromises)
await this.#coreManager.close()

this.#sqlite.close()

this.emit('close')
}

/**
* @param {import('multi-core-indexer').Entry[]} entries
* @param {{projectIndexWriter: IndexWriter, sharedIndexWriter: IndexWriter}} indexWriters
Expand Down
66 changes: 66 additions & 0 deletions test-e2e/project-crud.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,71 @@ test('CRUD operations', async (t) => {
'expected values returns from getMany()'
)
})

t.test('create, close and then create, update', async (st) => {
const projectId = await manager.createProject()
const project = await manager.getProject(projectId)
const values = new Array(5).fill(null).map(() => {
return getUpdateFixture(value)
})
for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}
// @ts-ignore
const written = await project[schemaName].create(value)
await project.close()

await st.exception(async () => {
const updateValue = getUpdateFixture(value)
// @ts-ignore
await project[schemaName].update(written.versionId, updateValue)
}, 'should fail updating since the project is already closed')

await st.exception(async () => {
for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}
}, 'should fail creating since the project is already closed')

// @ts-ignore
await st.exception.all(async () => {
await project[schemaName].getMany()
}, 'should fail getting since the project is already closed')
})

t.test('create, read, close, re-open, read', async (st) => {
const projectId = await manager.createProject()

let project = await manager.getProject(projectId)

const values = new Array(5).fill(null).map(() => {
return getUpdateFixture(value)
})

for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}

const many1 = await project[schemaName].getMany()
const manyValues1 = many1.map((doc) => valueOf(doc))

// close it
await project.close()

// re-open project
project = await manager.getProject(projectId)

const many2 = await project[schemaName].getMany()
const manyValues2 = many2.map((doc) => valueOf(doc))

st.alike(
stripUndef(manyValues1),
stripUndef(manyValues2),
'expected values returned before closing and after re-opening'
)
})
}
})
4 changes: 1 addition & 3 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,5 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>

await disconnect1()

// Temp fix until we have .close() method - waits for indexing idle to ensure
// we don't close storage in teardown while index is still being written.
await Promise.all(projects.map((p) => p.$getProjectSettings()))
await Promise.all(projects.map((p) => p.close()))
})

0 comments on commit 06a4335

Please sign in to comment.