Skip to content

Commit

Permalink
UBERF-7620: Send broadcast on delay with combine
Browse files Browse the repository at this point in the history
1. UBERF-7620: Send broadcast on delay with combine
2. Fix exclude for filtering
3. Fix notification broadcast filtering

Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Jul 18, 2024
1 parent a25b2a2 commit fffa854
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 11 deletions.
27 changes: 23 additions & 4 deletions server-plugins/notification-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import core, {
Data,
Doc,
DocumentUpdate,
generateId,
MeasureContext,
MixinUpdate,
Ref,
Expand Down Expand Up @@ -73,8 +74,8 @@ import serverNotification, {
getPersonAccount,
getPersonAccountById,
NOTIFICATION_BODY_SIZE,
UserInfo,
NOTIFICATION_TITLE_SIZE
NOTIFICATION_TITLE_SIZE,
UserInfo
} from '@hcengineering/server-notification'
import serverView from '@hcengineering/server-view'
import { stripTags } from '@hcengineering/text'
Expand Down Expand Up @@ -805,9 +806,27 @@ export async function createCollabDocInfo (

if (info === undefined) continue

res = res.concat(
await getNotificationTxes(control, object, tx, originTx, info, sender, params, notifyContexts, docMessages)
const targetRes = await getNotificationTxes(
control,
object,
tx,
originTx,
info,
sender,
params,
notifyContexts,
docMessages
)
const ids = new Set(res.map((it) => it._id))
if (info.account?.email !== undefined) {
const id = generateId() as string
control.operationContext.derived.targets[id] = (it) => {
if (ids.has(it._id)) {
return [info.account?.email as string]
}
}
}
res = res.concat(targetRes)
}
return res
}
Expand Down
7 changes: 5 additions & 2 deletions server/core/src/server/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -781,13 +781,16 @@ export class TServerStorage implements ServerStorage {
this.options.branding,
true
)
const result = await performAsync(applyCtx)
const aresult = await performAsync(applyCtx)

if (applyTxes.length > 0) {
await this.apply(applyCtx, applyTxes)
}
// We need to broadcast changes
await this.broadcastCtx(applyCtx.derived.txes.concat(result), applyCtx.derived.targets)
const combinedTxes = applyCtx.derived.txes.concat(aresult)
if (combinedTxes.length > 0) {
await this.broadcastCtx(combinedTxes, applyCtx.derived.targets)
}
},
{ count: txes.length }
)
Expand Down
53 changes: 52 additions & 1 deletion server/ws/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import core, {
AccountRole,
TxFactory,
TxProcessor,
reduceCalls,
toIdMap,
type Account,
type Class,
Expand All @@ -36,7 +37,14 @@ import core, {
} from '@hcengineering/core'
import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types'
import {
type ClientSessionCtx,
type ConnectionSocket,
type Session,
type SessionRequest,
type StatisticsElement
} from './types'
import { handleSend } from './utils'
/**
* @public
*/
Expand All @@ -48,6 +56,8 @@ export class ClientSession implements Session {
sessionId = ''
lastRequest = Date.now()

broadcastTx: Tx[] = []

total: StatisticsElement = { find: 0, tx: 0 }
current: StatisticsElement = { find: 0, tx: 0 }
mins5: StatisticsElement = { find: 0, tx: 0 }
Expand Down Expand Up @@ -297,6 +307,47 @@ export class ClientSession implements Session {
void handleSend(toSendAll, undefined, Array.from(toSendTarget.keys()))
}

doBroadcast = reduceCalls(async (ctx: MeasureContext, socket: ConnectionSocket) => {
if (this.broadcastTx.length > 10000) {
const classes = new Set<Ref<Class<Doc>>>()
for (const dtx of this.broadcastTx) {
if (TxProcessor.isExtendsCUD(dtx._class)) {
classes.add((dtx as TxCUD<Doc>).objectClass)
}
const etx = TxProcessor.extractTx(dtx)
if (TxProcessor.isExtendsCUD(etx._class)) {
classes.add((etx as TxCUD<Doc>).objectClass)
}
}
const bevent = createBroadcastEvent(Array.from(classes))
this.broadcastTx = []
await socket.send(
ctx,
{
result: [bevent]
},
this.binaryMode,
this.useCompression
)
} else {
const txes = [...this.broadcastTx]
this.broadcastTx = []
await handleSend(ctx, socket, { result: txes }, 32 * 1024, this.binaryMode, this.useCompression)
}
})

timeout: any

broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {
this.broadcastTx.push(...tx)
// We need to put into client broadcast queue, to send user requests first
// Collapse events in 1 second interval
clearTimeout(this.timeout)
this.timeout = setTimeout(() => {
void this.doBroadcast(ctx, socket)
}, 5)
}

private async sendWithPart (
derived: Tx[],
ctx: ClientSessionCtx,
Expand Down
8 changes: 4 additions & 4 deletions server/ws/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,11 @@ class TSessionManager implements SessionManager {

const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('📭 broadcast', {})
function send (): void {
const send = (): void => {
for (const sessionRef of sessions) {
const tt = sessionRef.session.getUser()
if ((target === undefined && !(exclude ?? []).includes(tt)) || (target?.includes(tt) ?? false)) {
void sendResponse(ctx, sessionRef.session, sessionRef.socket, { result: resp })
sessionRef.session.broadcast(ctx, sessionRef.socket, resp)
}
}
ctx.end()
Expand Down Expand Up @@ -546,8 +546,8 @@ class TSessionManager implements SessionManager {
pipelineCtx,
{ ...token.workspace, workspaceUrl, workspaceName },
upgrade,
(tx, targets) => {
this.broadcastAll(workspace, tx, targets)
(tx, targets, exclude) => {
this.broadcastAll(workspace, tx, targets, exclude)
},
branding
),
Expand Down
4 changes: 4 additions & 0 deletions server/ws/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export interface Session {

requests: Map<string, SessionRequest>

broadcastTx: Tx[]

binaryMode: boolean
useCompression: boolean
total: StatisticsElement
Expand All @@ -81,6 +83,8 @@ export interface Session {
isUpgradeClient: () => boolean

getMode: () => string

broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void
}

/**
Expand Down

0 comments on commit fffa854

Please sign in to comment.