Skip to content

Commit

Permalink
feat(parallel): clamp the limit between 1 and array length (#238)
Browse files Browse the repository at this point in the history
Co-authored-by: Alec Larson <[email protected]>
  • Loading branch information
MarlonPassos-git and aleclarson authored Dec 1, 2024
1 parent 14132e5 commit b4854f6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 19 deletions.
4 changes: 4 additions & 0 deletions .github/next-minor.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ https://github.com/radashi-org/radashi/pull/262
#### Add `signal` option to `parallel`

https://github.com/radashi-org/radashi/pull/262

#### Tolerate out-of-range `parallel` limit

https://github.com/radashi-org/radashi/pull/238
2 changes: 2 additions & 0 deletions docs/async/parallel.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const users = await _.parallel(3, userIds, async userId => {
})
```

Since v12.3.0, if the limit is greater than the array length, it will be clamped to the array length. Similarly, if the limit is less than 1, it will be clamped to 1.

### Interrupting

Since v12.3.0, processing can be manually interrupted. Pass an `AbortController.signal` via the `signal` option. When the signal is aborted, no more calls to your callback will be made. Any in-progress calls will continue to completion, unless you manually connect the signal inside your callback. In other words, `parallel` is only responsible for aborting the array loop, not the async operations themselves.
Expand Down
23 changes: 15 additions & 8 deletions src/async/parallel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
AggregateError,
clamp,
flat,
fork,
isNumber,
Expand All @@ -22,12 +23,16 @@ type WorkItemResult<K> = {
error: any
}

export type ParallelOptions =
| {
limit: number
signal?: AbortSignal
}
| number
export type ParallelOptions = {
/**
* The maximum number of functions to run concurrently. If a
* negative number is passed, only one function will run at a time.
* If a number bigger than the array `length` is passed, the array
* length will be used.
*/
limit: number
signal?: AbortSignal
}

/**
* Executes many async functions in parallel. Returns the results from
Expand Down Expand Up @@ -57,7 +62,7 @@ export type ParallelOptions =
* @version 12.1.0
*/
export async function parallel<T, K>(
options: ParallelOptions,
options: ParallelOptions | number,
array: readonly T[],
func: (item: T) => Promise<K>,
): Promise<K[]> {
Expand Down Expand Up @@ -95,7 +100,9 @@ export async function parallel<T, K>(
}

const queues = Promise.all(
list(1, options.limit).map(() => new Promise(processor)),
list(1, clamp(options.limit, 1, array.length)).map(
() => new Promise(processor),
),
)

let signalPromise: Promise<never> | undefined
Expand Down
47 changes: 36 additions & 11 deletions tests/async/parallel.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
import * as _ from 'radashi'

describe('parallel', () => {
async function testConcurrency(
limit: number,
array: readonly unknown[],
expected: number[],
) {
vi.useFakeTimers()

let numInProgress = 0
const tracking: number[] = []

const promise = _.parallel(limit, array, async () => {
numInProgress++
tracking.push(numInProgress)
await _.sleep(10)
numInProgress--
})

await vi.advanceTimersByTimeAsync(50)
await promise

expect(tracking).toEqual(expected)
}

test('returns all results from all functions', async () => {
const [errors, results] = await _.try(async () => {
return _.parallel(1, _.list(1, 3), async num => {
Expand Down Expand Up @@ -29,15 +52,7 @@ describe('parallel', () => {
})

test('does not run more than the limit at once', async () => {
let numInProgress = 0
const tracking: number[] = []
await _.parallel(3, _.list(1, 14), async () => {
numInProgress++
tracking.push(numInProgress)
await _.sleep(0)
numInProgress--
})
expect(Math.max(...tracking)).toBe(3)
await testConcurrency(3, _.list(1, 6), [1, 2, 3, 3, 3, 3])
})

test('abort before all iterations are complete', async () => {
Expand Down Expand Up @@ -88,8 +103,6 @@ describe('parallel', () => {
})

test('remove abort listener after completion', async () => {
vi.useFakeTimers()

const ctrl = new AbortController()
const removeEventListener = vi.spyOn(ctrl.signal, 'removeEventListener')

Expand All @@ -109,4 +122,16 @@ describe('parallel', () => {
expect.any(Function),
)
})

test('limit defaults to 1 if negative', async () => {
await testConcurrency(-1, _.list(1, 3), [1, 1, 1])
})

test('limit defaults to 1 if zero is passed', async () => {
await testConcurrency(0, _.list(1, 3), [1, 1, 1])
})

test('limit defaults to array length if Infinity is passed', async () => {
await testConcurrency(Number.POSITIVE_INFINITY, _.list(1, 3), [1, 2, 3])
})
})

0 comments on commit b4854f6

Please sign in to comment.