-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsmq.ts
75 lines (57 loc) · 1.97 KB
/
smq.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import { Channel, Event } from './channels'
import RedisSMQ from 'rsmq'
import redis from 'redis'
const config = {
host: process.env.REDIS_HOST,
port: Number(process.env.REDIS_PORT),
}
const client = redis.createClient(config)
const rsmq = new RedisSMQ({ client })
export class Smq {
constructor() {
this.init()
}
async init() {
const queues = await rsmq.listQueuesAsync()
const noQ = Object.keys(Channel).filter((channel) => !queues.includes(channel))
if (noQ.length) return
return Promise.allSettled(noQ.map((qname) => rsmq.createQueueAsync({ qname })))
}
sendMessage(qname: Channel, message: Record<string, any>) {
return rsmq.sendMessageAsync({ qname, message: JSON.stringify(message) })
}
async receiveMessage<T>(qname: Channel): Promise<{ id: string; message: T | false }> {
const { id, message } = (await rsmq.receiveMessageAsync({ qname })) as RedisSMQ.QueueMessage
return { id, message: this.decode(message) }
}
changeMessageVisibility(qname: string, id: string, vt: number) {
return rsmq.changeMessageVisibilityAsync({ qname, id, vt })
}
deleteMessage(qname: Channel, id: string) {
return rsmq.deleteMessageAsync({ qname, id })
}
subscribe<T>(event: Event, suffix: string, cb: (data: T | false) => null) {
const channel = this.getChannel(suffix, event)
const handler = (_, message: string) => {
const payload = this.decode<T>(message)
cb(payload)
}
client.subscribe(channel, handler)
return () => client.unsubscribe(channel, handler)
}
publish(event: Event, suffix: string, message: Record<string, any>) {
const payload = JSON.stringify(message)
const channel = this.getChannel(suffix, event)
client.publish(channel, payload)
}
private getChannel(suffix: string, event: Event) {
return !suffix ? event : `${event}.${suffix}`
}
private decode<T>(message: string): T | false {
try {
return JSON.parse(message)
} catch {
return false
}
}
}