Skip to content

Commit

Permalink
feat(intelligence): add intelligence websocket client
Browse files Browse the repository at this point in the history
  • Loading branch information
zlq4863947 committed Mar 17, 2019
1 parent 1b27bc1 commit d6f3872
Show file tree
Hide file tree
Showing 17 changed files with 220 additions and 28 deletions.
2 changes: 1 addition & 1 deletion modules/common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dripjs-common",
"version": "0.1.2",
"version": "0.1.4",
"description": "dripjs common module",
"main": "dist/index.js",
"typings": "dist/index.d.ts",
Expand Down
18 changes: 16 additions & 2 deletions modules/common/socketio/socketio-rxjs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('SocketIORxjs', () => {
action: 'test',
};
ioRxjs.message$.subscribe((o) => {
ioRxjs.send(['test', msg]);
ioRxjs.send('test', msg);
expect(o).toEqual(msg);
});
ioServer.on('connection', (ws) => {
Expand Down Expand Up @@ -66,6 +66,20 @@ describe('SocketIORxjs', () => {
}, 500);
});

it('subscribe onmessage to next', (done) => {
const msg = {
action: 'test',
};
ioRxjs.message$.subscribe((o) => {
expect(o).toEqual(msg);
});

ioRxjs.next(msg);
setTimeout(() => {
done();
}, 500);
});

it('subscribe onerror', (done) => {
ioServer.on('connection', () => {
ioServer.emit('error', new Error('xxx erorr'));
Expand Down Expand Up @@ -106,7 +120,7 @@ describe('SocketIORxjs', () => {
expect(data).toEqual(msg);
});

ioRxjs.emit('joinRoom', [roomName, joinMsg]);
ioRxjs.emit('joinRoom', roomName, joinMsg);
ioServer.on('connection', (ws) => {
ws.on('joinRoom', (roomNm, data) => {
ws.join(roomNm);
Expand Down
24 changes: 10 additions & 14 deletions modules/common/socketio/socketio-rxjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,27 @@ export class SocketIORxjs<T = any> {
return this.data$.asObservable();
}

constructor(url: string) {
this.socket = io(url);
constructor(url: string, opts?: SocketIOClient.ConnectOpts) {
this.socket = io(url, opts);
this.socket.on('message', (data: any) => this.data$.next(data));
}

next(data: any): void {
this.data$.next(data);
}

/**
* @param args
*/
send(args: any | any[]): void {
if (args instanceof Array) {
this.socket.send(...args);
} else {
this.socket.send(args);
}
send(...args: any[]): void {
this.socket.send(...args);
}

/**
* @param args
*/
emit(event: string, args: any | any[]): void {
if (args instanceof Array) {
this.socket.emit(event, ...args);
} else {
this.socket.emit(event, args);
}
emit(event: string, ...args: any[]): void {
this.socket.emit(event, ...args);
}

close(): void {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
"bignumber.js": "^8.1.1",
"config": "^3.0.1",
"dotenv": "^7.0.0",
"dripjs-common": "^0.1.2",
"dripjs-common": "^0.1.4",
"dripjs-exchanges": "^0.1.2",
"dripjs-types": "^0.1.12",
"lodash": "^4.17.11",
Expand Down
1 change: 1 addition & 0 deletions projects/intelligence/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './intel-client';
75 changes: 75 additions & 0 deletions projects/intelligence/client/intel-client.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { ConfigIntelServer, Depth, SupportedExchange, Symbol, Ticker, Transaction } from 'dripjs-types';

import { ApplicationModule, IntelChannel } from '../service';
import { IntelClient } from './intel-client';

// tslint:disable-next-line
const config: ConfigIntelServer = require('config').container.intelService;

describe('intel-client', () => {
let app: INestApplication;
let client: IntelClient;
const exchange = SupportedExchange.Bitmex;
const pair = 'XBTUSD';

beforeAll(async () => {
const serverPort = config.port;
const testingModule = await Test.createTestingModule({
imports: [ApplicationModule],
}).compile();
app = testingModule.createNestApplication();
await app.listenAsync(serverPort);

client = new IntelClient({
...config,
ip: '127.0.0.1',
});
client.connect();
});

afterAll(async () => {
await app.close();
client.disconnect();
});

it('getSymbols', async () => {
const symbols = await client.getSymbols(exchange);
expect(symbols.length).toBeGreaterThan(0);
});

it('tick$', (done) => {
client.ticker$(exchange, pair).subscribe((res) => {
expect(res.channel).toEqual(IntelChannel.Ticker);
expect(res.data).toBeDefined();
expect((<Ticker>res.data).ask).toBeGreaterThan(0);
expect((<Ticker>res.data).bid).toBeGreaterThan(0);
client.stopTicker(exchange, pair);
done();
});
});

it('depth$', (done) => {
client.depth$(exchange, pair).subscribe((res) => {
expect(res.channel).toEqual(IntelChannel.Depth);
expect(res.data).toBeDefined();
expect((<Depth>res.data).asks.length).toBeGreaterThan(20);
expect((<Depth>res.data).bids.length).toBeGreaterThan(20);
client.stopDepth(exchange, pair);
done();
});
});

it('transaction$', (done) => {
client.transaction$(exchange, pair).subscribe((res) => {
expect(res.channel).toEqual(IntelChannel.Transaction);
expect(res.data).toBeDefined();
expect((<Transaction>res.data).amount).toBeGreaterThan(0);
expect((<Transaction>res.data).price).toBeGreaterThan(0);
expect((<Transaction>res.data).time).toBeGreaterThan(0);
client.stopTransaction(exchange, pair);
done();
});
});
});
92 changes: 92 additions & 0 deletions projects/intelligence/client/intel-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { SocketIORxjs } from 'dripjs-common';
import { SupportedExchange } from 'dripjs-types';
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

import { IntelChannel, IntelRealtimeResponse } from '../service';

export interface IntelClientOptions {
ip: string;
port: number;
username: string;
password: string;
}

export class IntelClient {
private ioRxjs: SocketIORxjs;

constructor(private readonly options: IntelClientOptions) {}

connect(): void {
this.ioRxjs = new SocketIORxjs(`http://${this.options.ip}:${this.options.port}`, {
transportOptions: {
polling: {
extraHeaders: {
username: this.options.username,
password: this.options.password,
},
},
},
});
}

disconnect(): void {
this.ioRxjs.close();
}

async getSymbols(exchange: SupportedExchange): Promise<Symbol[]> {
return new Promise((resolve, reject) => {
try {
this.ioRxjs.emit('symbols', exchange, (res) => {
resolve(res);
});
} catch (error) {
reject(error);
}
});
}

ticker$(exchange: SupportedExchange, symbol: string): Observable<IntelRealtimeResponse> {
const channel = IntelChannel.Ticker;

return this.subscribe(channel, exchange, symbol);
}

stopTicker(exchange: SupportedExchange, symbol: string): void {
const channel = IntelChannel.Ticker;
this.unsubscribe(channel, exchange, symbol);
}

depth$(exchange: SupportedExchange, symbol: string): Observable<IntelRealtimeResponse> {
const channel = IntelChannel.Depth;

return this.subscribe(channel, exchange, symbol);
}

stopDepth(exchange: SupportedExchange, symbol: string): void {
const channel = IntelChannel.Depth;
this.unsubscribe(channel, exchange, symbol);
}

transaction$(exchange: SupportedExchange, symbol: string): Observable<IntelRealtimeResponse> {
const channel = IntelChannel.Transaction;

return this.subscribe(channel, exchange, symbol);
}

stopTransaction(exchange: SupportedExchange, symbol: string): void {
const channel = IntelChannel.Transaction;
this.unsubscribe(channel, exchange, symbol);
}

private subscribe(channel: IntelChannel, exchange: SupportedExchange, symbol: string): Observable<IntelRealtimeResponse> {
this.ioRxjs.socket.on(channel, (res) => this.ioRxjs.next(res));
this.ioRxjs.emit('subscribe', exchange, symbol, channel);

return this.ioRxjs.message$.pipe(filter((res: IntelRealtimeResponse) => res.channel === channel));
}

private unsubscribe(channel: IntelChannel, exchange: SupportedExchange, symbol: string): void {
this.ioRxjs.emit('unsubscribe', exchange, symbol, channel);
}
}
1 change: 1 addition & 0 deletions projects/intelligence/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './core';
export * from './service';
export * from './client';
5 changes: 3 additions & 2 deletions projects/intelligence/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dripjs-intelligence",
"version": "0.1.8",
"version": "0.1.9",
"description": "dripjs data acquisition service",
"license": "GPL-3.0-or-later",
"main": "dist/index.js",
Expand Down Expand Up @@ -30,12 +30,13 @@
"@nestjs/microservices": "5.7.4",
"@nestjs/testing": "5.7.4",
"@nestjs/websockets": "5.7.4",
"json-value-replacer": "0.1.1",
"class-transformer": "0.2.0",
"class-validator": "0.9.1",
"config": "3.0.1",
"dripjs-common": "^0.1.4",
"dripjs-exchanges": "0.1.2",
"dripjs-types": "0.1.12",
"json-value-replacer": "0.1.1",
"moment": "2.24.0",
"reflect-metadata": "0.1.13",
"rxjs": "6.4.0",
Expand Down
2 changes: 1 addition & 1 deletion projects/intelligence/service/common/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ExchangeCryptoAuthConfig, RealtimeType, SupportedExchange } from 'dripj

import { BitmexSpy, IntelFactory, Spy } from '../../core';
import { IntelServiceException } from '../exceptions';
import { IntelChannel, IntelRealtimeResponse } from './types';
import { IntelChannel, IntelRealtimeResponse } from '../types';

export function findSpy(exchange: string, config: ExchangeCryptoAuthConfig): Spy {
switch (exchange) {
Expand Down
1 change: 0 additions & 1 deletion projects/intelligence/service/common/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './types';
export * from './helpers';
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ArgumentsHost, Catch, ExceptionFilter } from '@nestjs/common';

import { IntelErrorResponse } from '../common';
import { IntelErrorResponse } from '../types';
import { IntelServiceException } from './intel-exception';

@Catch(IntelServiceException)
Expand Down
1 change: 1 addition & 0 deletions projects/intelligence/service/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './app.module';
export * from './types';
2 changes: 1 addition & 1 deletion projects/intelligence/service/intel/intel.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { Symbol } from 'dripjs-types';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

import { IntelChannel, IntelRealtimeResponse } from '../common';
import { IntelServiceExceptionFilter } from '../exceptions';
import { AuthGuard } from '../guards';
import { IntelChannel, IntelRealtimeResponse } from '../types';
import { IntelService } from './intel.service';

@WebSocketGateway()
Expand Down
3 changes: 2 additions & 1 deletion projects/intelligence/service/intel/intel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

import { Spy } from '../../core';
import { IntelChannel, IntelRealtimeResponse, findSpy, transform } from '../common';
import { findSpy, transform } from '../common';
import { IntelServiceException } from '../exceptions';
import { IntelChannel, IntelRealtimeResponse } from '../types';

// tslint:disable-next-line
const jsonValueReplacer = require('json-value-replacer');
Expand Down
File renamed without changes.
17 changes: 14 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1867,9 +1867,20 @@ dotgitignore@^1.0.3:
minimatch "^3.0.4"

dripjs-common@^0.1.2:
version "0.1.2"
resolved "https://registry.yarnpkg.com/dripjs-common/-/dripjs-common-0.1.2.tgz#ccb5a65914fad00ef3cf3b6a4881d199faab2f72"
integrity sha512-Ji7uyzDJ8ItGKouzn06vaImAAbwM63HfrQxmTbiQKKSxyM0Xohrol+asRpiVWljS/oBI+SvJORUDAqCtJslErA==
version "0.1.3"
resolved "https://registry.yarnpkg.com/dripjs-common/-/dripjs-common-0.1.3.tgz#dad2d00d7ca6cb332946bcc220db455ff0c11ff3"
integrity sha512-Q/xn3LRiKwKPgm2j1aZu+O/8yHBNL5NYemaXuQkykXsA2/ni9dgo/02iE+JDYihPM1y9SYL0XhlbnR78K28AmA==
dependencies:
bignumber.js "^8.1.1"
rxjs "^6.4.0"
socket.io "^2.2.0"
socket.io-client "^2.2.0"
ws "^6.1.3"

dripjs-common@^0.1.4:
version "0.1.4"
resolved "https://registry.yarnpkg.com/dripjs-common/-/dripjs-common-0.1.4.tgz#6f73f4076b1feb6652789307a864232c44cfa82f"
integrity sha512-ktd+ylTAsNLNlix9dZ/25HkHJnDH+dZanlobnLO2vbmjlQevLa/05hqHTwly+vTQ53rff5NFOOlhSC2neJm7RQ==
dependencies:
bignumber.js "^8.1.1"
rxjs "^6.4.0"
Expand Down

0 comments on commit d6f3872

Please sign in to comment.