Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async: add re_thread_async #462

Merged
merged 25 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ endif()
set(HEADERS
include/re.h
include/re_aes.h
include/re_async.h
include/re_atomic.h
include/re_av1.h
include/re_base64.h
Expand Down Expand Up @@ -246,6 +247,8 @@ set(SRCS
src/av1/obu.c
src/av1/pkt.c

src/async/async.c

src/base64/b64.c

src/btrace/btrace.c
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ MODULES += dns
MODULES += md5 crc32 sha hmac base64
MODULES += udp sa net tcp tls
MODULES += list mbuf hash
MODULES += fmt tmr trace btrace main mem dbg sys thread mqueue
MODULES += fmt tmr trace btrace main mem dbg sys thread mqueue async
MODULES += mod conf
MODULES += bfcp
MODULES += aes srtp
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Patches can sent via Github
| Name | Status | Description |
|----------|----------|------------------------------------------------|
| aes | stable | AES (Advanced Encryption Standard) |
| async | testing | Async module |
| base64 | stable | Base-64 encoding/decoding functions |
| bfcp | stable | The Binary Floor Control Protocol (BFCP) |
| conf | stable | Configuration file parser |
Expand Down Expand Up @@ -144,10 +145,10 @@ Patches can sent via Github
| tls | stable | Transport Layer Security |
| tmr | stable | Timer handling |
| turn | stable | Obtaining Relay Addresses from STUN (TURN) |
| trace | testing | Trace Helpers JSON traces (chrome://tracing) |
| udp | stable | UDP transport |
| uri | stable | Generic URI library |
| websock | stable | WebSocket Client and Server |
| trace | testing | Trace Helpers JSON traces (chrome://tracing) |

legend:
* *stable* - code complete; stable code and stable API
Expand Down
1 change: 1 addition & 0 deletions include/re.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern "C" {

/* Library modules */
#include "re_aes.h"
#include "re_async.h"
#include "re_base64.h"
#include "re_bfcp.h"
#include "re_btrace.h"
Expand Down
14 changes: 14 additions & 0 deletions include/re_async.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* @file re_async.h async
*
* Copyright (C) 2022 Sebastian Reimers
*/

struct re_async;

typedef int (re_async_work_h)(void *arg);
typedef void (re_async_h)(int err, void *arg);

int re_async_alloc(struct re_async **asyncp, uint16_t workers);
int re_async(struct re_async *a, re_async_work_h *work, re_async_h *cb,
void *arg);
5 changes: 5 additions & 0 deletions include/re_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Copyright (C) 2010 Creytiv.com
*/

#include "re_async.h"

struct re;

enum {
Expand Down Expand Up @@ -55,6 +57,9 @@ void re_thread_close(void);
void re_thread_enter(void);
void re_thread_leave(void);
int re_thread_check(void);
int re_thread_async_init(uint16_t workers);
void re_thread_async_close(void);
int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg);

void re_set_mutex(void *mutexp);

Expand Down
246 changes: 246 additions & 0 deletions src/async/async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/**
* @file async.c Async API
*
* Copyright (C) 2022 Sebastian Reimers
*/

#include <re_types.h>
#include <re_mem.h>
#include <re_list.h>
#include <re_thread.h>
#include <re_async.h>
#include <re_tmr.h>
#include <re_mqueue.h>

#define DEBUG_MODULE "async"
#define DEBUG_LEVEL 5
#include <re_dbg.h>

struct async_work {
struct le le;
re_async_work_h *work;
re_async_h *cb;
void *arg;
int err;
};

struct re_async {
thrd_t *thrd;
uint16_t workers;
volatile bool run;
cnd_t wait;
mtx_t mtx;
struct list freel;
struct list workl;
struct list curl;
struct tmr tmr;
struct mqueue *mqueue;
};


static int worker_thread(void *arg)
{
struct re_async *a = arg;
struct le *le;
struct async_work *work;

for (;;) {
mtx_lock(&a->mtx);
if (!a->run) {
mtx_unlock(&a->mtx);
return 0;
}

if (list_isempty(&a->workl)) {
cnd_wait(&a->wait, &a->mtx);

if (list_isempty(&a->workl) || !a->run) {
mtx_unlock(&a->mtx);
continue;
}
}

le = list_head(&a->workl);
list_move(le, &a->curl);
mtx_unlock(&a->mtx);

work = le->data;
work->err = work->work(work->arg);

mtx_lock(&a->mtx);
mqueue_push(a->mqueue, 0, work);
mtx_unlock(&a->mtx);
}

return 0;
}


static void async_destructor(void *data)
{
struct re_async *async = data;

tmr_cancel(&async->tmr);

mtx_lock(&async->mtx);
async->run = false;
cnd_broadcast(&async->wait);
mtx_unlock(&async->mtx);

for (int i = 0; i < async->workers; i++) {
thrd_join(async->thrd[i], NULL);
}

list_flush(&async->workl);
list_flush(&async->curl);
list_flush(&async->freel);
cnd_destroy(&async->wait);
mtx_destroy(&async->mtx);
mem_deref(async->mqueue);
mem_deref(async->thrd);
}


static void worker_check(void *arg)
{
struct re_async *async = arg;

mtx_lock(&async->mtx);
if (!list_isempty(&async->workl)) {
if (async->workers == list_count(&async->curl))
DEBUG_WARNING("all async workers are busy\n");
else
cnd_broadcast(&async->wait);
}
mtx_unlock(&async->mtx);

tmr_start(&async->tmr, 100, worker_check, async);
}


/* called by re main event loop */
static void queueh(int id, void *data, void *arg)
{
struct async_work *work = data;
struct re_async *async = arg;
(void)id;

work->cb(work->err, work->arg);

mtx_lock(&async->mtx);
list_move(&work->le, &async->freel);
mtx_unlock(&async->mtx);
}


/**
* Allocate a new async object
*
* @param asyncp Pointer to allocated async object
* @param nthrds Number of worker threads
*
* @return 0 if success, otherwise errorcode
*/
int re_async_alloc(struct re_async **asyncp, uint16_t workers)
{
int err;
struct re_async *async;
struct async_work *async_work;

if (!asyncp || !workers)
return EINVAL;

async = mem_zalloc(sizeof(struct re_async), NULL);
if (!async)
return ENOMEM;

err = mqueue_alloc(&async->mqueue, queueh, async);
if (err)
goto err;

async->thrd = mem_zalloc(sizeof(thrd_t) * workers, NULL);
if (!async->thrd) {
err = ENOMEM;
mem_deref(async->mqueue);
goto err;
}

mtx_init(&async->mtx, mtx_plain);
cnd_init(&async->wait);
tmr_init(&async->tmr);

mem_destructor(async, async_destructor);

async->run = true;

for (int i = 0; i < workers; i++) {
err = thread_create_name(&async->thrd[i],
"async worker thread", worker_thread,
async);
if (err)
goto err;

async->workers++;

/* preallocate */
async_work = mem_zalloc(sizeof(struct async_work), NULL);
if (!async_work) {
err = ENOMEM;
goto err;
}

list_append(&async->freel, &async_work->le, async_work);
}

tmr_start(&async->tmr, 10, worker_check, async);

*asyncp = async;

return 0;

err:
mem_deref(async);
return err;
}


/**
* Execute work handler async and get a callback from re main thread
*
* @param async Pointer to async object
* @param work Work handler
* @param cb Callback handler (called by re main thread)
* @param arg Handler argument (has to be thread-safe)
*
* @return 0 if success, otherwise errorcode
*/
int re_async(struct re_async *async, re_async_work_h *work, re_async_h *cb,
void *arg)
{
int err = 0;
struct async_work *async_work;

if (unlikely(!async || !work || !cb))
return EINVAL;

if (unlikely(list_isempty(&async->freel))) {
async_work = mem_zalloc(sizeof(struct async_work), NULL);
if (!async_work)
return ENOMEM;
}
else {
async_work = list_head(&async->freel)->data;
list_unlink(&async_work->le);
}

async_work->work = work;
async_work->cb = cb;
async_work->arg = arg;

mtx_lock(&async->mtx);
list_append(&async->workl, &async_work->le, async_work);
cnd_signal(&async->wait);
mtx_unlock(&async->mtx);

return err;
}
5 changes: 5 additions & 0 deletions src/async/mod.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# mod.mk
#

SRCS += async/async.c
Loading