Skip to content

Commit

Permalink
feat: add support for MONOLITH_TRANSPORTER env var (#29373)
Browse files Browse the repository at this point in the history
Co-authored-by: Debdut Chakraborty <[email protected]>
  • Loading branch information
sampaiodiego and debdutdeb authored Jun 2, 2023
1 parent 585c49f commit 3109a76
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 28 deletions.
11 changes: 11 additions & 0 deletions .changeset/mean-guests-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@rocket.chat/meteor': minor
---

feat: *Enterprise* Add support for different transporters to connect multiple monolith instances.

To use that, you can use the `TRANSPORTER` env var adding "monolith+" to the transporter value. To use NATS for example, your env var should be:

```bash
export TRANSPORTER="monolith+nats://localhost:4222"
```
15 changes: 15 additions & 0 deletions apps/meteor/ee/server/local-services/instance/getTransporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function getTransporter({ transporter, port }: { transporter?: string; port?: string } = {}) {
if (transporter) {
if (!transporter.match(/^(?:monolith\+)/)) {
throw new Error('invalid transporter');
}

const [, ...url] = transporter.split('+');
return url.join('');
}

return {
port: port ? port.trim() : 0,
udpDiscovery: false,
};
}
65 changes: 37 additions & 28 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,50 @@
import os from 'os';

import type { BrokerNode } from 'moleculer';
import { ServiceBroker } from 'moleculer';
import { ServiceBroker, Transporters } from 'moleculer';
import { License, ServiceClassInternal } from '@rocket.chat/core-services';
import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models';
import { InstanceStatus } from '@rocket.chat/instance-status';

import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import type { IInstanceService } from '../../sdk/types/IInstanceService';
import { getTransporter } from './getTransporter';

export class InstanceService extends ServiceClassInternal implements IInstanceService {
protected name = 'instance';

private broadcastStarted = false;

private transporter: Transporters.TCP | Transporters.NATS;

private isTransporterTCP = true;

private broker: ServiceBroker;

private troubleshootDisableInstanceBroadcast = false;

constructor() {
super();

this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
return;
}
const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT });
if (typeof tx === 'string') {
this.transporter = new Transporters.NATS({ url: tx });
this.isTransporterTCP = false;
} else {
this.transporter = new Transporters.TCP(tx);
}

if (clientAction === 'inserted' && data?.extraInformation?.port) {
this.connectNode(data);
}
});
if (this.isTransporterTCP) {
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
return;
}

if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) {
this.connectNode(data);
}
});
}

this.onEvent('license.module', async ({ module, valid }) => {
if (module === 'scalability' && valid) {
Expand Down Expand Up @@ -60,17 +75,9 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
}

async created() {
const port = process.env.TCP_PORT ? String(process.env.TCP_PORT).trim() : 0;

this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: {
type: 'TCP',
options: {
port,
udpDiscovery: false,
},
},
transporter: this.transporter,
});

this.broker.createService({
Expand Down Expand Up @@ -135,18 +142,20 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe

StreamerCentral.on('broadcast', this.sendBroadcast.bind(this));

await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
if (this.isTransporterTCP) {
await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
},
},
},
{
sort: {
_createdAt: -1,
{
sort: {
_createdAt: -1,
},
},
},
).forEach(this.connectNode.bind(this));
).forEach(this.connectNode.bind(this));
}
}

private connectNode(record: any) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { expect } from 'chai';

import { getTransporter } from '../../../../../../../../server/local-services/instance/getTransporter';

describe('getTransporter', () => {
it('should return TCP with port 0 by default', () => {
expect(getTransporter()).to.deep.equal({ port: 0, udpDiscovery: false });
});

it('should return TCP with port set via env var', () => {
expect(getTransporter({ port: '1234' })).to.deep.equal({ port: '1234', udpDiscovery: false });

expect(getTransporter({ port: ' 1234' })).to.deep.equal({ port: '1234', udpDiscovery: false });

expect(getTransporter({ port: ' 1234 ' })).to.deep.equal({ port: '1234', udpDiscovery: false });
});

it('should throw if transporter set incorrectly', () => {
expect(() => getTransporter({ transporter: 'something' })).to.throw('invalid transporter');
});

it('should return transporter if set correctly', () => {
expect(getTransporter({ transporter: 'monolith+nats://address' })).to.equal('nats://address');
});
});

0 comments on commit 3109a76

Please sign in to comment.