Skip to content

Commit

Permalink
feat(async): queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
Hfutsora committed Mar 8, 2023
1 parent 1a8a5e2 commit 2c7c3a5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
25 changes: 18 additions & 7 deletions src/Async.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { isNullable } from './function'
import { isEmpty } from './Iterator'
import { Either } from './Either'
import { Maybe } from './Maybe'

/** Asynchronous tasks queued for execution */
type AsyncTaskQueue = { f: () => Promise<any>, then: (v: any) => void, err: (e: any) => void }[]
Expand Down Expand Up @@ -32,9 +30,22 @@ export type AsyncQueueOption = {
*/
export class AsyncQueue {
private limit: number | undefined
private size = 0
private count = 0
private queue: AsyncTaskQueue = []

/**
* Returns the total number of tasks currently being executed and in the waiting queue.
*/
get size() {
return this.queue.length + this.count
}
/**
* Alias of `size`.
*/
get length() {
return this.size
}

/**
* @param limit Maximum concurrency limit
*/
Expand All @@ -49,18 +60,18 @@ export class AsyncQueue {

private async process<A>(a: () => PromiseLike<A>) {
try {
this.size += 1
this.count += 1

return await a()
} finally {
this.size -= 1
this.count -= 1
this.afterProcess()
}
}

private async afterProcess() {
if(isEmpty(this.queue)) return
if(isNullable(this.limit) || (this.limit && this.size < this.limit) ) {
if(isNullable(this.limit) || (this.limit && this.count < this.limit) ) {
const { f, then, err } = this.queue.shift()!
this.process(f).then(then, err)
}
Expand All @@ -70,7 +81,7 @@ export class AsyncQueue {
* Runs a asynchronous task in the async queue.
*/
async run<A>(f: () => Promise<A>): Promise<A> {
if(isNullable(this.limit) || (this.limit && this.size < this.limit) ) {
if(isNullable(this.limit) || (this.limit && this.count < this.limit) ) {
return await this.process(f)
} else {
return await new Promise<A>((then, err) => this.queue.push({ f, then, err }))
Expand Down
10 changes: 10 additions & 0 deletions test/Async.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ test('microtask', () => {
expect(num).toBe(0)
})

test('async queue size', async () => {
const aqueue = new AsyncQueue(1)

aqueue.run(delay)
expect(aqueue.size).toEqual(1)
aqueue.run(delay)
expect(aqueue.size).toEqual(2)
})

test('async queue with limit 1', async () => {
const aqueue = new AsyncQueue(1)

Expand All @@ -28,6 +37,7 @@ test('async queue with limit 1', async () => {
aqueue.run(() => new Promise(then => delay(100).then(() => { then(100); seq.push(100) }))),
aqueue.run(() => new Promise(then => delay(150).then(() => { then(150); seq.push(150) })))
])
expect(aqueue.size).toEqual(0)
expect(seq).toEqual([200, 100, 150])
expect(res).toEqual([200, 100, 150])
})
Expand Down

0 comments on commit 2c7c3a5

Please sign in to comment.