Skip to content

Commit

Permalink
Fix semaphore for macOS
Browse files Browse the repository at this point in the history
Use of named semaphores had issues (incorrect use of sem_open), which
caused essentially an infinite loop.

This change replaces use of named semaphore with a semaphore built on
top of pthread condvar.
  • Loading branch information
hkr committed May 16, 2021
1 parent 9f071d7 commit 9ebe47c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 86 deletions.
58 changes: 57 additions & 1 deletion sema.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,62 @@ sem_destroy(sem_t *sem)
return (CloseHandle(sem->h) ? 0 : -1);
}

#endif /* _BUILD_WIN32 defined? */
#elif defined(__MACH__)
#include <pthread.h>

typedef struct sema_type
{
pthread_mutex_t mutex;
pthread_cond_t cond;
unsigned int counter;
} sem_t;

static int
sem_init(sem_t *sem,
int pshared,
unsigned int value)
{
if (pthread_mutex_init(&sem->mutex, NULL) != 0)
return -1;
if (pthread_cond_init(&sem->cond, NULL) != 0) {
(void)pthread_mutex_destroy(&sem->mutex);
return -1;
}
sem->counter = value;
return 0;
}

static int
sem_wait(sem_t *sem)
{
if (pthread_mutex_lock(&sem->mutex))
return -1;
while (sem->counter == 0)
(void)pthread_cond_wait(&sem->cond, &sem->mutex);
--sem->counter;
(void)pthread_mutex_unlock(&sem->mutex);
return 0;
}

static int
sem_post(sem_t *sem)
{
if (pthread_mutex_lock(&sem->mutex))
return -1;
++sem->counter;
(void)pthread_mutex_unlock(&sem->mutex);
if (pthread_cond_signal(&sem->cond))
return -1;
return 0;
}

static int
sem_destroy(sem_t *sem)
{
(void)pthread_mutex_destroy(&sem->mutex);
(void)pthread_cond_destroy(&sem->cond);
}

#endif

#endif /* _SEMA_H defined? */
90 changes: 5 additions & 85 deletions thd_iin.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,16 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/

#if defined(__MACH__)
#define USE_NAMED_SEMAPHORES
#endif

#if defined(_BUILD_WIN32)
#include <windows.h>
#include "sema.h" /* POSIX semaphores for win32 */
#elif defined(__MACH__)
#include "sema.h" /* POSIX semaphores for macOS */
#else
#include <pthread.h>
#include <semaphore.h>
#endif
#if defined(USE_NAMED_SEMAPHORES)
#include <unistd.h>
#include <limits.h>
#endif
#include <stdio.h>
#include <string.h>
#include "iin.h"
Expand Down Expand Up @@ -71,11 +66,7 @@ typedef struct threaded_decorator_type
job_t job;

/* both are exclusive => at any moment only one is acquirable */
#ifdef USE_NAMED_SEMAPHORES
sem_t *worker_lock, *master_lock;
#else
sem_t worker_lock, master_lock;
#endif

buffer_t buf[2];
size_t active_buf; /* <= % 2 == index of the current buffer */
Expand All @@ -96,11 +87,7 @@ thd_loop(void *p)
threaded_decorator_t *thd = (threaded_decorator_t *)p;

do {
#if defined(USE_NAMED_SEMAPHORES)
int result = sem_wait(thd->worker_lock);
#else
int result = sem_wait(&thd->worker_lock);
#endif
if (result == 0 && !thd->done) {
job_t *job = &thd->job;
buffer_t *dest = job->dest;
Expand All @@ -118,20 +105,12 @@ thd_loop(void *p)
dest->num_sectors = job->num_sectors;

/* unlock master */
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->master_lock);
#else
sem_post(&thd->master_lock);
#endif
}
} while (!thd->done);

/* notify master */
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->master_lock);
#else
sem_post(&thd->master_lock);
#endif
return (0);
}

Expand All @@ -148,11 +127,7 @@ thd_read(iin_t *iin,
int result;

do {
#if defined(USE_NAMED_SEMAPHORES)
result = sem_wait(thd->master_lock);
#else
result = sem_wait(&thd->master_lock);
#endif
if (result == 0) { /* worker is idle */
const buffer_t *buf = thd->buf + (thd->active_buf % 2);
if (start_sector == buf->start_sector &&
Expand All @@ -165,11 +140,7 @@ thd_read(iin_t *iin,
thd->job.start_sector = start_sector;
thd->job.num_sectors = num_sectors;
thd->job.dest = thd->buf + (thd->active_buf % 2);
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->worker_lock);
#else
sem_post(&thd->worker_lock);
#endif
}
}
} while (1);
Expand All @@ -180,19 +151,11 @@ thd_read(iin_t *iin,
thd->job.start_sector = start_sector + num_sectors;
thd->job.num_sectors = IIN_NUM_SECTORS;
thd->job.dest = thd->buf + (++thd->active_buf % 2);
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->worker_lock);
#else
sem_post(&thd->worker_lock);
#endif
} else
/* since there is no pre-fetch scheduled,
/* since there is no pre-fetch scheduled,
unlock master or the next read call would block forever */
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->master_lock);
#else
sem_post(&thd->master_lock);
#endif

return (result);
}
Expand All @@ -219,28 +182,14 @@ thd_close(iin_t *iin)
int result;

/* wait for worker to finish... */
#if defined(USE_NAMED_SEMAPHORES)
sem_wait(thd->master_lock);
#else
sem_wait(&thd->master_lock);
#endif
thd->done = 1;
#if defined(USE_NAMED_SEMAPHORES)
sem_post(thd->worker_lock);
sem_wait(thd->master_lock);
#else
sem_post(&thd->worker_lock);
sem_wait(&thd->master_lock);
#endif

/* clean-up */
#if defined(USE_NAMED_SEMAPHORES)
sem_close(thd->worker_lock);
sem_close(thd->master_lock);
#else
sem_destroy(&thd->worker_lock);
sem_destroy(&thd->master_lock);
#endif
result = thd->worker->close(thd->worker);
thd->worker = NULL;

Expand Down Expand Up @@ -274,9 +223,6 @@ thd_create(iin_t *worker)
{
u_int32_t total_sectors, sector_size;
threaded_decorator_t *thd;
#if defined(USE_NAMED_SEMAPHORES)
char sema_name_buf[NAME_MAX - 4];
#endif

if (worker->stat(worker, &sector_size, &total_sectors) != RET_OK)
return (NULL);
Expand All @@ -298,26 +244,8 @@ thd_create(iin_t *worker)
thd->done = 0;

/* master is unlocked, worker is locked; would run on 1st need */
#if defined(USE_NAMED_SEMAPHORES)
snprintf(sema_name_buf, sizeof(sema_name_buf), "hdl_dump%x_%i", getpid(), 0);
thd->worker_lock = sem_open((const char *)sema_name_buf, O_CREAT);
if (thd->worker_lock != NULL)
#else
if (sem_init(&thd->worker_lock, 0, 0) == 0)
#endif
{
#if defined(USE_NAMED_SEMAPHORES)
sem_unlink((const char *)sema_name_buf);
snprintf(sema_name_buf, sizeof(sema_name_buf), "hdl_dump%x_%i", getpid(), 1);
thd->master_lock = sem_open((const char *)sema_name_buf, O_CREAT);
if (thd->master_lock != NULL)
#else
if (sem_init(&thd->master_lock, 0, 1) == 0)
#endif
{
#if defined(USE_NAMED_SEMAPHORES)
sem_unlink((const char *)sema_name_buf);
#endif
if (sem_init(&thd->worker_lock, 0, 0) == 0) {
if (sem_init(&thd->master_lock, 0, 1) == 0) {
#if defined(_BUILD_WIN32)
DWORD thread_id = 0;
HANDLE h = CreateThread(NULL, 0, &thd_loop, thd, 0, &thread_id);
Expand All @@ -330,17 +258,9 @@ thd_create(iin_t *worker)
return ((iin_t *)thd); /* unix: success */
#endif

#if defined(USE_NAMED_SEMAPHORES)
(void)sem_close(thd->master_lock);
#else
(void)sem_destroy(&thd->master_lock);
#endif
}
#if defined(USE_NAMED_SEMAPHORES)
(void)sem_close(thd->worker_lock);
#else
(void)sem_destroy(&thd->worker_lock);
#endif
}
osal_free(thd), thd = NULL;
}
Expand Down

0 comments on commit 9ebe47c

Please sign in to comment.