Skip to content

Commit

Permalink
Merge branch 'dev' into add-error-class
Browse files Browse the repository at this point in the history
  • Loading branch information
su-chang authored Jan 12, 2022
2 parents ae207a4 + 71821c7 commit 6659566
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 79 deletions.
76 changes: 0 additions & 76 deletions .vscode/settings.json

This file was deleted.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "wechaty-puppet-whatsapp",
"version": "1.11.26",
"version": "1.11.27",
"description": "Wechaty Puppet for WhatsApp",
"type": "module",
"exports": {
Expand Down
7 changes: 5 additions & 2 deletions src/puppet-whatsapp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
import WAWebJS, { ClientOptions, GroupChat } from 'whatsapp-web.js'
import WAError from './pure-function-helpers/error-type.js'
import { WXWORK_ERROR_TYPE } from './schema/error-type.js'
import { Manager } from './work/manager.js'
// @ts-ignore
// import { MessageTypes } from 'whatsapp-web.js'
// import { Attachment } from './mock/user/types'
Expand All @@ -60,6 +61,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {
private roomStore: { [id: string]: WhatsappContact }
private roomInvitationStore: { [id: string]: Partial<WAWebJS.InviteV4Data>}
private whatsapp: undefined | WhatsApp
private manager: undefined | Manager

constructor (
override options: PuppetWhatsAppOptions = {},
Expand All @@ -71,6 +73,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {
this.contactStore = {}
this.roomStore = {}
this.roomInvitationStore = {}

}

override async start (): Promise<void> {
Expand All @@ -81,7 +84,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {
}
const session = await this.memory.get(MEMORY_SLOT)
const whatsapp = await getWhatsApp(this.options['puppeteerOptions'] as ClientOptions, session)
this.whatsapp = whatsapp
this.manager = new Manager(whatsapp)
this.state.on('pending')
this.initWhatsAppEvents(whatsapp)

Expand Down Expand Up @@ -307,7 +310,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {

override async contactSelfName (name: string): Promise<void> {
log.verbose('PuppetWhatsApp', 'contactSelfName(%s)', name)
await this.whatsapp!.setDisplayName(name)
await this.manager!.setNickname(name)
}

override async contactSelfSignature (signature: string): Promise<void> {
Expand Down
20 changes: 20 additions & 0 deletions src/work/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { RequestManager } from './request/requestManager.js'
import type { Client as WhatsApp } from 'whatsapp-web.js'

export class Manager {

whatsapp: WhatsApp
requestManager: RequestManager

constructor (whatsapp: WhatsApp) {

this.whatsapp = whatsapp
void this.whatsapp.initialize()
this.requestManager = new RequestManager(this.whatsapp)
}

setNickname (nickname: string) {
return this.requestManager.setNickname(nickname)
}

}
116 changes: 116 additions & 0 deletions src/work/request/rateManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { EventEmitter } from 'events'
import { log } from '../../config.js'
import { sleep } from '../utils.js'

interface FunctionObj {
func: () => any,
resolve: (data: any) => void,
reject: (e: any) => void,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}

export interface RateOptions {
queueId?: string,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}

type RateManagerEvents = 'error'

const MAX_QUEUE_SIZE = 5000

export class RateManager extends EventEmitter {

private counter = 0

public override emit(event: 'error', error: string): boolean
public override emit(event: never, ...args: never[]): never
public override emit (event: RateManagerEvents, ...args: any[]): boolean {
return super.emit(event, ...args)
}

public override on(event: 'error', listener: (error: string) => void): this
public override on(event: never, listener: never): never
public override on (event: RateManagerEvents, listener: (...args: any[]) => void): this {
super.on(event, listener)
return this
}

private functionQueueMap: { [id: string]: FunctionObj[] } = {}
private runningMap: { [id: string]: boolean } = {}

public getQueueLength (queueId: string) {
if (!this.functionQueueMap[queueId]) {
return 0
}
return this.functionQueueMap[queueId]!.length
}

public async exec<T> (func: () => T, options: RateOptions = {}) {
const queueId = options.queueId || 'default'
const { delayAfter, delayBefore, uniqueKey } = options

if (!this.functionQueueMap[queueId]) {
this.functionQueueMap[queueId] = []
}

if (this.functionQueueMap[queueId]!.length > MAX_QUEUE_SIZE) {
if (this.counter % MAX_QUEUE_SIZE === 0) {
log.error(`EXCEED_QUEUE_SIZE: Max queue size for id: ${queueId} reached: ${this.functionQueueMap[queueId]!.length} > ${MAX_QUEUE_SIZE}(max queue size). Drop these tasks.`)
this.counter = 0
}
this.counter++
}

return new Promise<T>((resolve, reject) => {
this.functionQueueMap[queueId]!.push({ delayAfter, delayBefore, func, reject, resolve, uniqueKey })
if (!this.runningMap[queueId]) {
this.runningMap[queueId] = true
void this.execNext(queueId)
}
})
}

private async execNext (queueId: string) {
const queue = this.functionQueueMap[queueId]
if (!queue) {
return
}

const funcObj = queue.shift()
if (!funcObj) {
throw new Error(`can not get funcObj from queue with id: ${queueId}.`)
}
const { delayAfter, delayBefore, func, resolve, reject, uniqueKey } = funcObj
await sleep(delayBefore)
try {
const result = await func()
resolve(result)
/**
* If uniqueKey is given, will resolve functions with same key in the queue
*/
if (uniqueKey) {
const sameFuncIndexes = queue.map((f, index) => ({ func: f, index }))
.filter(o => o.func.uniqueKey === uniqueKey)
.map(o => o.index)
.sort((a, b) => b - a)
for (const index of sameFuncIndexes) {
const [sameFunc] = queue.splice(index, 1)
sameFunc!.resolve(result)
}
}
} catch (e) {
reject(e)
}
await sleep(delayAfter)
if (queue.length > 0) {
await this.execNext(queueId)
} else {
delete this.runningMap[queueId]
}
}

}
Loading

0 comments on commit 6659566

Please sign in to comment.