Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSK-825: Client proper reconnection #2797

Merged
merged 2 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 102 additions & 43 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import { SortingOrder } from './storage'
import { Tx, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
import { toFindResult } from './utils'

const transactionThreshold = 3000

/**
* @public
*/
Expand All @@ -50,6 +52,7 @@ export interface Client extends Storage {
*/
export interface ClientConnection extends Storage, BackupClient {
close: () => Promise<void>
onConnect?: () => Promise<void>
}

class ClientImpl implements Client, BackupClient {
Expand Down Expand Up @@ -151,75 +154,100 @@ export async function createClient (
allowedPlugins?: Plugin[]
): Promise<Client> {
let client: ClientImpl | null = null

// Temporal buffer, while we apply model
let txBuffer: Tx[] | undefined = []
const loadedTxIds = new Set<Ref<Tx>>()

const hierarchy = new Hierarchy()
const model = new ModelDb(hierarchy)

let lastTx: number

function txHandler (tx: Tx): void {
if (client === null) {
txBuffer?.push(tx)
} else {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
client.updateFromRemote(tx)
}
lastTx = tx.modifiedOn
}
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()

const conn = await connect(txHandler)

await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model)

txBuffer = txBuffer.filter((tx) => !loadedTxIds.has(tx._id))

client = new ClientImpl(hierarchy, model, conn)

for (const tx of txBuffer) {
txHandler(tx)
loadedTxIds.add(tx._id)
}
txBuffer = undefined

const oldOnConnect: (() => void) | undefined = conn.onConnect
conn.onConnect = async () => {
// Find all new transactions and apply
await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model)

// We need to look for last 1000 transactions and if it is more since lastTx one we receive, we need to perform full refresh.
const atxes = await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx } },
{ sort: { _id: SortingOrder.Ascending }, limit: transactionThreshold }
)
if (atxes.total < transactionThreshold) {
console.log('applying input transactions', atxes.length)
for (const tx of atxes) {
txHandler(tx)
}
} else {
// We need to trigger full refresh on queries, etc.
await oldOnConnect?.()
}
}

return client
}
async function loadModel (
conn: ClientConnection,
processedTx: Set<Ref<Tx>>,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
hierarchy: Hierarchy,
model: ModelDb
): Promise<void> {
const t = Date.now()

const atxes = await conn.findAll(
core.class.Tx,
{ objectSpace: core.space.Model },
{ objectSpace: core.space.Model, _id: { $nin: Array.from(processedTx.values()) } },
{ sort: { _id: SortingOrder.Ascending } }
)
console.log('find model', atxes.length, Date.now() - t)

let systemTx: Tx[] = []
const userTx: Tx[] = []
console.log('find' + (processedTx.size === 0 ? 'full model' : 'model diff'), atxes.length, Date.now() - t)

atxes.forEach((tx) => (tx.modifiedBy === core.account.System ? systemTx : userTx).push(tx))

if (allowedPlugins !== undefined) {
// Filter system transactions
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()
for (const t of systemTx) {
if (t._class === core.class.TxCreateDoc) {
const ct = t as TxCreateDoc<Doc>
if (ct.objectClass === core.class.PluginConfiguration) {
configs.set(ct.objectId as Ref<PluginConfiguration>, TxProcessor.createDoc2Doc(ct) as PluginConfiguration)
}
} else if (t._class === core.class.TxUpdateDoc) {
const ut = t as TxUpdateDoc<Doc>
if (ut.objectClass === core.class.PluginConfiguration) {
const c = configs.get(ut.objectId as Ref<PluginConfiguration>)
if (c !== undefined) {
TxProcessor.updateDoc2Doc(c, ut)
}
}
}
}

fillConfiguration(systemTx, configs)
const excludedPlugins = Array.from(configs.values()).filter((it) => !allowedPlugins.includes(it.pluginId as Plugin))

for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
const excluded = new Set<Ref<Tx>>()
for (const id of c.transactions) {
excluded.add(id as Ref<Tx>)
}
const exclude = systemTx.filter((t) => excluded.has(t._id))
console.log('exclude plugin', c.pluginId, exclude.length)
systemTx = systemTx.filter((t) => !excluded.has(t._id))
}
}
}
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
}

const txes = systemTx.concat(userTx)

const txMap = new Map<Ref<Tx>, Ref<Tx>>()
for (const tx of txes) txMap.set(tx._id, tx._id)
for (const tx of txes) {
processedTx.add(tx._id)
}

for (const tx of txes) {
try {
hierarchy.tx(tx)
Expand All @@ -234,13 +262,44 @@ export async function createClient (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
}
}
}

txBuffer = txBuffer.filter((tx) => txMap.get(tx._id) === undefined)

client = new ClientImpl(hierarchy, model, conn)

for (const tx of txBuffer) txHandler(tx)
txBuffer = undefined
function fillConfiguration (systemTx: Tx[], configs: Map<Ref<PluginConfiguration>, PluginConfiguration>): void {
for (const t of systemTx) {
if (t._class === core.class.TxCreateDoc) {
const ct = t as TxCreateDoc<Doc>
if (ct.objectClass === core.class.PluginConfiguration) {
configs.set(ct.objectId as Ref<PluginConfiguration>, TxProcessor.createDoc2Doc(ct) as PluginConfiguration)
}
} else if (t._class === core.class.TxUpdateDoc) {
const ut = t as TxUpdateDoc<Doc>
if (ut.objectClass === core.class.PluginConfiguration) {
const c = configs.get(ut.objectId as Ref<PluginConfiguration>)
if (c !== undefined) {
TxProcessor.updateDoc2Doc(c, ut)
}
}
}
}
}

return client
function pluginFilterTx (
excludedPlugins: PluginConfiguration[],
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
systemTx: Tx[]
): Tx[] {
for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
const excluded = new Set<Ref<Tx>>()
for (const id of c.transactions) {
excluded.add(id as Ref<Tx>)
}
const exclude = systemTx.filter((t) => excluded.has(t._id))
console.log('exclude plugin', c.pluginId, exclude.length)
systemTx = systemTx.filter((t) => !excluded.has(t._id))
}
}
}
return systemTx
}
1 change: 1 addition & 0 deletions packages/platform/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export async function monitor<T> (status: Status, promise: Promise<T>): Promise<
return result
} catch (err) {
void setPlatformStatus(unknownError(err)) // eslint-disable-line no-void
console.error(err)
throw err
}
}
9 changes: 9 additions & 0 deletions packages/presentation/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ export function setClient (_client: Client): void {
}
}

/**
* @public
*/
export function refreshClient (): void {
if (liveQuery !== undefined) {
void liveQuery.refreshConnect()
}
}

/**
* @public
*/
Expand Down
9 changes: 9 additions & 0 deletions packages/query/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ export class LiveQuery extends TxProcessor implements Client {
return this.client.getModel()
}

// Perform refresh of content since connection established.
async refreshConnect (): Promise<void> {
for (const q of [...this.queue]) {
if (!(await this.removeFromQueue(q))) {
await this.refresh(q)
}
}
}

private match (q: Query, doc: Doc): boolean {
if (!this.getHierarchy().isDerived(doc._class, q._class)) {
// Check if it is not a mixin and not match class
Expand Down
2 changes: 1 addition & 1 deletion plugins/client-resources/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Package allow to create a client to interact with running platform.

## Node JS

For NodeJS enviornment it is required to configure ClientSocketFactory using 'ws' package.
For NodeJS environment it is required to configure ClientSocketFactory using 'ws' package.

```ts
// We need to override default WebSocket factory with 'ws' one.
Expand Down
Loading