Skip to content

Commit

Permalink
feat(intelligence): add websocket service
Browse files Browse the repository at this point in the history
  • Loading branch information
zlq4863947 committed Mar 9, 2019
1 parent 00b7536 commit 1996218
Show file tree
Hide file tree
Showing 24 changed files with 1,489 additions and 35 deletions.
19 changes: 19 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ module.exports = {
},
entities: ['dist/modules/models/entity/**/*.entity.js'],
},
container: {
intelService: {
port: 6531,
username: 'test',
password: 'test',
}
},
exchange: {
crypto: {
bitmex: {
apiKey: process.env.SPEC_BITMEX_REAL_API_KEY,
apiSecret: process.env.SPEC_BITMEX_REAL_API_SECRET,
},
bitmexTestNet: {
apiKey: process.env.SPEC_BITMEX_TEST_API_KEY,
apiSecret: process.env.SPEC_BITMEX_TEST_API_SECRET,
},
}
},
log: {
typeorm: true,
},
Expand Down
1 change: 1 addition & 0 deletions modules/common/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './websocket';
export * from './big-number-util';
export * from './nest-util';
9 changes: 9 additions & 0 deletions modules/common/nest-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { INestApplication } from '@nestjs/common';
import { ModuleMetadata } from '@nestjs/common/interfaces';
import { Test } from '@nestjs/testing';

export async function createNestTestApplication(metadata: ModuleMetadata): Promise<INestApplication> {
const testingModule = await Test.createTestingModule(metadata).compile();

return testingModule.createNestApplication();
}
10 changes: 8 additions & 2 deletions modules/common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dripjs-common",
"version": "0.1.0",
"version": "0.1.1",
"description": "dripjs common module",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
Expand Down Expand Up @@ -29,10 +29,16 @@
},
"homepage": "https://github.com/zlq4863947/dripjs/tree/master/modules/common#readme",
"devDependencies": {
"@types/ws": "6.0.1"
"@types/express": "4.16.1",
"@types/socket.io": "2.1.2",
"@types/socket.io-client": "1.4.32",
"@types/ws": "6.0.1",
"express": "4.16.4",
"socket.io-client": "2.2.0"
},
"dependencies": {
"rxjs": "^6.4.0",
"socket.io": "^2.2.0",
"ws": "^6.1.3"
}
}
1 change: 1 addition & 0 deletions modules/common/socketio/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './socketio-rxjs';
144 changes: 144 additions & 0 deletions modules/common/socketio/socketio-rxjs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import * as http from 'http';

import * as express from 'express';
import * as socketIO from 'socket.io';

import { SocketIORxjs } from './socketio-rxjs';

describe('SocketIORxjs', () => {
let ioServer: socketIO.Server;
let ioRxjs: SocketIORxjs;

beforeEach(() => {
const port = 8800;
const app = express();
const server = http.createServer(app);
ioServer = socketIO(server).listen(port);
ioRxjs = new SocketIORxjs(`http://localhost:${port}`);
});

afterEach(() => {
ioRxjs.close();
ioServer.close();
});

it('subscribe onopen', (done) => {
const msg = {
action: 'test',
};
ioRxjs.message$.subscribe((o) => {
ioRxjs.send(['test', msg]);
expect(o).toEqual(msg);
});
ioServer.on('connection', (ws) => {
ws.send(msg);
});
setTimeout(() => {
done();
}, 500);
});

it('send message', (done) => {
const msg = {
action: 'test',
};
ioRxjs.message$.subscribe((o) => {
ioRxjs.send(msg);
});

ioServer.on('connection', (ws) => {
ws.send(msg);
ws.on('message', (data) => {
expect(data).toEqual(msg);
});
});
setTimeout(() => {
done();
}, 500);
});

it('subscribe onmessage to exception', (done) => {
ioServer.on('connection', (ws) => {
ws.send('t');
});
setTimeout(() => {
done();
}, 500);
});

it('subscribe onerror', (done) => {
ioServer.on('connection', () => {
ioServer.emit('error', new Error('xxx erorr'));
});
setTimeout(() => {
done();
}, 500);
});

it('subscribe connect_timeout', (done) => {
ioServer.on('connection', () => {
ioServer.emit('connect_timeout', new Error('erorr: cant not connection'));
});
setTimeout(() => {
done();
}, 500);
});

it('subscribe connect_error', (done) => {
const ws = new SocketIORxjs(`http://localhost:8801`);

expect(ws).toBeDefined();
setTimeout(() => {
done();
}, 500);
});

it('join room', (done) => {
const roomName = 'event_room';
const msg = {
toEveryone: 'Hi',
};
const joinMsg = {
name: 'jack',
};

ioRxjs.message$.subscribe((data) => {
expect(data).toEqual(msg);
});

ioRxjs.emit('joinRoom', [roomName, joinMsg]);
ioServer.on('connection', (ws) => {
ws.on('joinRoom', (roomNm, data) => {
ws.join(roomNm);
expect(data).toEqual(joinMsg);
});
});
setTimeout(() => {
ioServer.to(roomName).emit('message', msg);
}, 500);
setTimeout(() => {
done();
}, 1000);
});

it('custom event', (done) => {
const event = 'foo';
const msg = {
toEveryone: 'Hi',
};
ioRxjs.message$.subscribe((data) => {
expect(data).toEqual('ok');
});

ioRxjs.emit(event, msg);
ioServer.on('connection', (ws) => {
ws.on(event, (data) => {
ioServer.send('ok');
expect(data).toEqual(msg);
});
});
setTimeout(() => {
done();
}, 500);
});
});
79 changes: 79 additions & 0 deletions modules/common/socketio/socketio-rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Observable, ReplaySubject } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import * as io from 'socket.io-client';

export class SocketIORxjs<T = any> {
readonly socket: SocketIOClient.Socket;
private readonly data$ = new ReplaySubject<T>(1);
private readonly opened$ = new ReplaySubject<boolean>(1);

/**
* message stream
*/
get message$(): Observable<T> {
return this.data$.asObservable();
}

constructor(url: string) {
this.socket = io(url);
this.socket.on('connect', () => {
this.opened$.next(true);
});
this.socket.on('connect_timeout', () => {
this.opened$.next(false);
});
this.socket.on('connect_error', () => {
this.opened$.next(false);
});
this.socket.on('disconnect', () => {
this.opened$.next(false);
});
this.socket.on('error', () => {
this.opened$.next(false);
});
this.socket.on('message', (data: any) => this.data$.next(data));
}

/**
* @param args
*/
send(args: any | any[]): void {
// wait until socket open and send the text only once per call
this.opened$
.pipe(
take(1),
filter((opened) => opened),
)
.subscribe(() => {
if (args instanceof Array) {
this.socket.send(...args);
} else {
this.socket.send(args);
}
});
}

/**
* @param args
*/
emit(event: string, args: any | any[]): void {
// wait until socket open and send the text only once per call
this.opened$
.pipe(
take(1),
filter((opened) => opened),
)
.subscribe(() => {
if (args instanceof Array) {
this.socket.emit(event, ...args);
} else {
this.socket.emit(event, args);
}
});
}

close(): void {
this.socket.close();
this.data$.complete();
}
}
36 changes: 36 additions & 0 deletions modules/types/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { ConnectionOptions } from 'typeorm';

export interface Config {
// config file name. for debug purpose.
env: string;
production: boolean;
database: Partial<ConnectionOptions>;
container: ConfigContainer;
exchange: ConfigExchange;
log: {
typeorm: boolean;
};
}

export interface ConfigContainer {
intelService: ConfigIntelServer;
}

export interface ConfigIntelServer {
port: number;
username: string;
password: number;
}

export interface ConfigExchange {
[type: string]: ConfigExchangeCrypto;
}

export interface ConfigExchangeCrypto {
[exchange: string]: ExchangeCryptoAuthConfig;
}

export interface ExchangeCryptoAuthConfig {
apiKey: string;
apiSecret: string;
}
1 change: 1 addition & 0 deletions modules/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './exchange';
export * from './intelligence';
export * from './http';
export * from './order';
export * from './config';
2 changes: 1 addition & 1 deletion modules/types/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dripjs-types",
"version": "0.1.3",
"version": "0.1.7",
"description": "dripjs types",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
Expand Down
Loading

0 comments on commit 1996218

Please sign in to comment.