diff --git a/src/async/async.c b/src/async/async.c index 8820eddc2..f3903f033 100644 --- a/src/async/async.c +++ b/src/async/async.c @@ -27,7 +27,8 @@ struct async_work { struct re_async { thrd_t *thrd; - uint16_t nthrds; + uint16_t thrds_req; + uint16_t thrds_started; RE_ATOMIC bool run; cnd_t wait; mtx_t mtx; @@ -80,7 +81,7 @@ static void async_destructor(void *data) cnd_broadcast(&async->wait); mtx_unlock(&async->mtx); - for (int i = 0; i < async->nthrds; i++) { + for (int i = 0; i < async->thrds_started; i++) { thrd_join(async->thrd[i], NULL); } @@ -111,11 +112,15 @@ static void worker_check(void *arg) static void queueh(int id, void *data, void *arg) { struct async_work *work = data; - (void)arg; + struct re_async *async = arg; (void)id; work->cb(work->err, work->arg); + + mtx_lock(&async->mtx); list_unlink(&work->le); + mtx_unlock(&async->mtx); + mem_deref(work); } @@ -156,15 +161,19 @@ int re_async_alloc(struct re_async **asyncp, uint16_t nthrds) mem_destructor(async, async_destructor); - async->nthrds = nthrds; + async->thrds_req = nthrds; re_atomic_rlx_set(&async->run, true); - for (int i = 0; i < async->nthrds; i++) { + for (int i = 0; i < async->thrds_req; i++) { err = thread_create_name(&async->thrd[i], "async worker thread", worker_thread, async); - if (err) + if (err) { + mem_deref(async); return err; + } + + async->thrds_started++; } tmr_start(&async->tmr, 10, worker_check, async);