Skip to content

Commit

Permalink
TSK-825: Client proper reconnection
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Mar 22, 2023
1 parent b0abf93 commit 7ff652d
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 84 deletions.
1 change: 1 addition & 0 deletions packages/platform/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export async function monitor<T> (status: Status, promise: Promise<T>): Promise<
return result
} catch (err) {
void setPlatformStatus(unknownError(err)) // eslint-disable-line no-void
console.error(err)
throw err
}
}
2 changes: 1 addition & 1 deletion plugins/client-resources/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Package allow to create a client to interact with running platform.

## Node JS

For NodeJS enviornment it is required to configure ClientSocketFactory using 'ws' package.
For NodeJS environment it is required to configure ClientSocketFactory using 'ws' package.

```ts
// We need to override default WebSocket factory with 'ws' one.
Expand Down
123 changes: 89 additions & 34 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ import core, {
generateId,
Ref,
Tx,
TxApplyIf,
TxHandler,
TxResult
} from '@hcengineering/core'
import { getMetadata, PlatformError, readResponse, ReqId, serialize, UNAUTHORIZED } from '@hcengineering/platform'
import {
getMetadata,
PlatformError,
readResponse,
ReqId,
serialize,
UNAUTHORIZED,
unknownError
} from '@hcengineering/platform'

class DeferredPromise {
class RequestPromise {
readonly promise: Promise<any>
resolve!: (value?: any) => void
reject!: (reason?: any) => void
reconnect?: () => void
constructor () {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
Expand All @@ -46,7 +56,7 @@ class DeferredPromise {

class Connection implements ClientConnection {
private websocket: ClientSocket | Promise<ClientSocket> | null = null
private readonly requests = new Map<ReqId, DeferredPromise>()
private readonly requests = new Map<ReqId, RequestPromise>()
private lastId = 0
private readonly interval: number
private readonly sessionId = generateId() as string
Expand All @@ -60,17 +70,19 @@ class Connection implements ClientConnection {
console.log('connection created')
this.interval = setInterval(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.sendRequest('ping')
this.sendRequest({ method: 'ping', params: [] })
}, 10000)
}

async close (): Promise<void> {
clearInterval(this.interval)
if (this.websocket !== null) {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
await this.websocket.then((ws) => ws.close())
} else {
this.websocket.close()
}
this.websocket.close()
this.websocket = null
}
}

Expand All @@ -83,7 +95,7 @@ class Connection implements ClientConnection {
return conn
} catch (err: any) {
console.log('failed to connect', err)
if (err.code === UNAUTHORIZED.code) {
if (err?.code === UNAUTHORIZED.code) {
this.onUnauthorized?.()
throw err
}
Expand All @@ -109,15 +121,28 @@ class Connection implements ClientConnection {
getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => new WebSocket(url) as ClientSocket)

const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`)
const opened = false
const socketId = this.sockets++

setTimeout(() => {
if (!opened) {
websocket.close()
reject(new PlatformError(unknownError('timeout')))
}
}, 20000)

websocket.onmessage = (event: MessageEvent) => {
const resp = readResponse(event.data)
if (resp.id === -1 && resp.result === 'hello') {
if (resp.error !== undefined) {
reject(resp.error)
return
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
resolve(websocket)

return
}
if (resp.id !== undefined) {
Expand All @@ -144,13 +169,14 @@ class Connection implements ClientConnection {
})
)
this.onUpgrade?.()
return
}
this.handler(tx)
}
}
websocket.onclose = () => {
console.log('client websocket closed', socketId)
// clearInterval(interval)
websocket.onclose = (ev) => {
console.log('client websocket closed', socketId, ev?.reason)

if (!(this.websocket instanceof Promise)) {
this.websocket = null
}
Expand All @@ -167,30 +193,47 @@ class Connection implements ClientConnection {
)
}
websocket.onerror = (event: any) => {
console.log('client websocket error:', socketId, JSON.stringify(event))
console.error('client websocket error:', socketId, event)
reject(new Error(`websocket error:${socketId}`))
}
})
}

private async sendRequest (method: string, ...params: any[]): Promise<any> {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
private async sendRequest (data: {
method: string
params: any[]
// If not defined, on reconnect with timeout, will retry automatically.
retry?: () => Promise<boolean>
}): Promise<any> {
const id = this.lastId++
const promise = new RequestPromise()

const sendData = async (): Promise<void> => {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
}
if (this.websocket === null) {
this.websocket = this.waitOpenConnection()
this.websocket = await this.websocket
}
this.requests.set(id, promise)
this.websocket.send(
serialize({
method: data.method,
params: data.params,
id
})
)
}
if (this.websocket === null) {
this.websocket = this.waitOpenConnection()
this.websocket = await this.websocket
promise.reconnect = () => {
setTimeout(async () => {
// In case we don't have response yet.
if (this.requests.has(id) && ((await data.retry?.()) ?? true)) {
await sendData()
}
}, 500)
}
const id = this.lastId++
this.websocket.send(
serialize({
method,
params,
id
})
)
const promise = new DeferredPromise()
this.requests.set(id, promise)
await sendData()
return await promise.promise
}

Expand All @@ -199,31 +242,43 @@ class Connection implements ClientConnection {
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return this.sendRequest('findAll', _class, query, options)
return this.sendRequest({ method: 'findAll', params: [_class, query, options] })
}

tx (tx: Tx): Promise<TxResult> {
return this.sendRequest('tx', tx)
return this.sendRequest({
method: 'tx',
params: [tx],
retry: async () => {
if (tx._class === core.class.TxApplyIf) {
return (
(await (await this.findAll(core.class.Tx, { _id: (tx as TxApplyIf).txes[0]._id }, { limit: 1 })).length) ===
0
)
}
return (await (await this.findAll(core.class.Tx, { _id: tx._id }, { limit: 1 })).length) === 0
}
})
}

loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return this.sendRequest('loadChunk', domain, idx)
return this.sendRequest({ method: 'loadChunk', params: [domain, idx] })
}

closeChunk (idx: number): Promise<void> {
return this.sendRequest('closeChunk', idx)
return this.sendRequest({ method: 'closeChunk', params: [idx] })
}

loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return this.sendRequest('loadDocs', domain, docs)
return this.sendRequest({ method: 'loadDocs', params: [domain, docs] })
}

upload (domain: Domain, docs: Doc[]): Promise<void> {
return this.sendRequest('upload', domain, docs)
return this.sendRequest({ method: 'upload', params: [domain, docs] })
}

clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {
return this.sendRequest('clean', domain, docs)
return this.sendRequest({ method: 'clean', params: [domain, docs] })
}
}

Expand Down
60 changes: 21 additions & 39 deletions plugins/client-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@ import { connect } from './connection'

export { connect }

/*!
* Anticrm Platform™ Client Plugin
* © 2020, 2021 Anticrm Platform Contributors. All Rights Reserved.
* Licensed under the Eclipse Public License, Version 2.0
*/
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export default async () => {
let _token: string | undefined
let client: Promise<Client> | Client | undefined

return {
function: {
GetClient: async (
Expand All @@ -38,38 +30,28 @@ export default async () => {
onUpgrade?: () => void,
onUnauthorized?: () => void
): Promise<Client> => {
if (client instanceof Promise) {
client = await client
}
if (token !== _token && client !== undefined) {
await client.close()
client = undefined
}
if (client === undefined) {
const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false
client = createClient(
(handler: TxHandler) => {
const url = new URL(`/${token}`, endpoint)
console.log('connecting to', url.href)
return connect(url.href, handler, onUpgrade, onUnauthorized)
},
filterModel ? getPlugins() : undefined
)
_token = token
const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false

// Check if we had dev hook for client.
const hook = getMetadata(clientPlugin.metadata.ClientHook)
if (hook !== undefined) {
const hookProc = await getResource(hook)
const _client = client
client = new Promise((resolve, reject) => {
_client
.then((res) => {
resolve(hookProc(res))
})
.catch((err) => reject(err))
})
}
let client = createClient(
(handler: TxHandler) => {
const url = new URL(`/${token}`, endpoint)
console.log('connecting to', url.href)
return connect(url.href, handler, onUpgrade, onUnauthorized)
},
filterModel ? getPlugins() : undefined
)
// Check if we had dev hook for client.
const hook = getMetadata(clientPlugin.metadata.ClientHook)
if (hook !== undefined) {
const hookProc = await getResource(hook)
const _client = client
client = new Promise((resolve, reject) => {
_client
.then((res) => {
resolve(hookProc(res))
})
.catch((err) => reject(err))
})
}
return await client
}
Expand Down
24 changes: 18 additions & 6 deletions plugins/workbench-resources/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLoc

export let versionError: string | undefined = ''

let _token: string | undefined
let _client: Client | undefined

export async function connect (title: string): Promise<Client | undefined> {
const loc = getCurrentLocation()
const ws = loc.path[1]
Expand All @@ -27,8 +30,17 @@ export async function connect (title: string): Promise<Client | undefined> {
return
}

const getClient = await getResource(client.function.GetClient)
const instance = await getClient(
if (_token !== token && _client !== undefined) {
await _client.close()
_client = undefined
}
if (_client !== undefined) {
return _client
}
_token = token

const clientFactory = await getResource(client.function.GetClient)
_client = await clientFactory(
token,
endpoint,
() => {
Expand All @@ -44,7 +56,7 @@ export async function connect (title: string): Promise<Client | undefined> {
)
console.log('logging in as', email)

const me = await instance.findOne(contact.class.EmployeeAccount, { email })
const me = await _client.findOne(contact.class.EmployeeAccount, { email })
if (me !== undefined) {
console.log('login: employee account', me)
setCurrentAccount(me)
Expand All @@ -59,7 +71,7 @@ export async function connect (title: string): Promise<Client | undefined> {
}

try {
const version = await instance.findOne<Version>(core.class.Version, {})
const version = await _client.findOne<Version>(core.class.Version, {})
console.log('Model version', version)

const requirdVersion = getMetadata(presentation.metadata.RequiredVersion)
Expand All @@ -85,8 +97,8 @@ export async function connect (title: string): Promise<Client | undefined> {
// Update window title
document.title = [ws, title].filter((it) => it).join(' - ')

setClient(instance)
return instance
setClient(_client)
return _client
}
function clearMetadata (ws: string): void {
const tokens = fetchMetadataLocalStorage(login.metadata.LoginTokens)
Expand Down
Loading

0 comments on commit 7ff652d

Please sign in to comment.