Skip to content

Commit

Permalink
Add pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
notVitaliy committed Dec 25, 2020
1 parent ee0db88 commit 5037fbc
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@magic8bot/smq",
"version": "1.0.2",
"version": "1.1.0",
"description": "Cryptocurrency trading bot",
"bugs": "https://github.com/magic8bot/smq/issues",
"license": "MIT",
Expand All @@ -19,6 +19,7 @@
},
"dependencies": {
"dotenv": "^8.2.0",
"redis": "^3.0.2",
"rsmq": "^0.12.2"
},
"devDependencies": {
Expand Down
5 changes: 5 additions & 0 deletions src/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ export enum Channel {
SyncTrades = 'SyncTrades',
Strategy = 'Strategy',
}

export enum Event {
XCH_TRADE = 'XCH_TRADE',
XCH_TRADE_PREROLL = 'XCH_TRADE_PREROLL',
}
37 changes: 33 additions & 4 deletions src/smq.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Channel } from './channels'
import { Channel, Event } from './channels'
import RedisSMQ from 'rsmq'
import redis from 'redis'

const config = {
host: process.env.REDIS_HOST,
port: Number(process.env.REDIS_PORT),
}

const rsmq = new RedisSMQ(config)
const client = redis.createClient(config)
const rsmq = new RedisSMQ({ client })

export class Smq {
constructor() {
Expand All @@ -25,11 +27,38 @@ export class Smq {
return rsmq.sendMessageAsync({ qname, message: JSON.stringify(message) })
}

receiveMessage(qname: Channel) {
return rsmq.receiveMessageAsync({ qname })
async receiveMessage<T>(qname: Channel): Promise<T | false> {
const { message } = (await rsmq.receiveMessageAsync({ qname })) as RedisSMQ.QueueMessage

return this.decode(message)
}

deleteMessage(qname: Channel, id: string) {
return rsmq.deleteMessageAsync({ qname, id })
}

subscribe<T>(event: Event, cb: (data: T | false) => null) {
const handler = (_, message: string) => {
const payload = this.decode<T>(message)
cb(payload)
}

client.subscribe(event, handler)

return () => client.unsubscribe(event, handler)
}

publish(event: Event, message: Record<string, any>) {
const payload = JSON.stringify(message)

client.publish(event, payload)
}

private decode<T>(message: string): T | false {
try {
return JSON.parse(message)
} catch {
return false
}
}
}

0 comments on commit 5037fbc

Please sign in to comment.