Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: fbr_poll and fbr_cond_wait_wto #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ if(WANT_VALGRIND)
endif(NOT HAVE_VALGRIND_H)
endif(WANT_VALGRIND)

check_include_files(poll.h HAVE_POLL_H)

include_directories(
"${CMAKE_CURRENT_SOURCE_DIR}/include"
"${CMAKE_CURRENT_BINARY_DIR}/include"
Expand Down
1 change: 1 addition & 0 deletions include/evfibers/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef _FBR_CONFIG_H_
#define _FBR_CONFIG_H_

#cmakedefine HAVE_POLL_H
#cmakedefine HAVE_VALGRIND_H
#cmakedefine FBR_EIO_ENABLED

Expand Down
41 changes: 39 additions & 2 deletions include/evfibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
* @file evfibers/fiber.h
* This file contains all client-visible API functions for working with fibers.
*/
#include <evfibers/config.h>

#include <unistd.h>
#include <stdarg.h>
#include <stddef.h>
Expand All @@ -145,11 +147,12 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#include <assert.h>
#include <ev.h>

#include <evfibers/config.h>

/**
* Maximum allowed level of fbr_transfer nesting within fibers.
*/
Expand Down Expand Up @@ -1153,6 +1156,22 @@ int fbr_connect(FBR_P_ int sockfd, const struct sockaddr *addr,
int fbr_connect_wto(FBR_P_ int sockfd, const struct sockaddr *addr,
socklen_t addrlen, ev_tstamp timeout);

#ifdef HAVE_POLL_H
/**
* Fiber friendly poll wrapper.
* @param [in] fds - socket file descriptors to examine
* @param [in] nfds - specifies the size of the fds array
* @param [in] timeout_ms in milliseconds to wait for events
* @return number of descriptors that are ready for I/O on success, -1 in case of error and errno set
*
* Poll wrapper that examines a set of file descriptors to see if some of them
* are ready for I/O or if certain events have occurred on them.
*
* Possible errno values are described in the poll man page.
*/
int fbr_poll(FBR_P_ struct pollfd fds[], nfds_t nfds, int timeout_ms);
#endif /* HAVE_POLL_H */

/**
* Fiber friendly libc read wrapper.
* @param [in] fd file descriptor to read from
Expand Down Expand Up @@ -1528,6 +1547,24 @@ void fbr_cond_destroy(FBR_P_ struct fbr_cond_var *cond);
*/
int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex);

/**
* Waits timeout number of seconds or until the condition is met.
*
* Current fiber is suspended until a signal is sent via fbr_cond_signal or
* fbr_cond_broadcast to the corresponding conditional variable or
* the timer expires.
*
* A mutex must be acquired by the calling fiber prior to waiting for a
* condition. Internally mutex is released and reacquired again before
* returning. Upon successful return calling fiber will hold the mutex.
*
* @see fbr_cond_init
* @see fbr_cond_destroy
* @see fbr_cond_broadcast
* @see fbr_cond_signal
*/
int fbr_cond_wait_wto(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex, ev_tstamp timeout);

/**
* Broadcasts a signal to all fibers waiting for condition.
*
Expand Down
53 changes: 53 additions & 0 deletions src/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,45 @@ int fbr_connect_wto(FBR_P_ int sockfd, const struct sockaddr *addr,
return r;
}

#ifdef HAVE_POLL_H
int fbr_poll(FBR_P_ struct pollfd fds[], nfds_t nfds, int timeout_ms)
{
nfds_t i;
ev_io io[nfds];
struct fbr_ev_base *events[nfds+1];
struct fbr_ev_watcher watcher[nfds];
struct fbr_destructor dtor[nfds];

if (timeout_ms != 0) {
for (i = 0; i < nfds; ++i) {
ev_io_init(&io[i], NULL, fds[i].fd,
(fds[i].events & POLLIN ? EV_READ : 0) |
(fds[i].events & POLLOUT ? EV_WRITE : 0));
ev_io_start(fctx->__p->loop, &io[i]);

fbr_ev_watcher_init(FBR_A_ &watcher[i],
(struct ev_watcher *)&io[i]);
events[i] = &watcher[i].ev_base;

dtor[i].func = watcher_io_dtor;
dtor[i].arg = &io[i];
dtor[i].active = 0;
fbr_destructor_add(FBR_A_ &dtor[i]);
}
events[i] = NULL;

if (timeout_ms > 0)
fbr_ev_wait_to(FBR_A_ events, timeout_ms/1000.0);
else
fbr_ev_wait(FBR_A_ events);

for (i = 0; i < nfds; ++i) {
fbr_destructor_remove(FBR_A_ &dtor[i], 1);
}
}
return poll(fds, nfds, 0); /* Non-blocking when timeout is 0. */
}
#endif /* HAVE_POLL_H */

ssize_t fbr_read(FBR_P_ int fd, void *buf, size_t count)
{
Expand Down Expand Up @@ -1691,6 +1730,20 @@ int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex)
return_success(0);
}

int fbr_cond_wait_wto(FBR_P_ struct fbr_cond_var *cond,
struct fbr_mutex *mutex, ev_tstamp timeout)
{
struct fbr_ev_cond_var ev;

if (mutex && fbr_id_isnull(mutex->locked_by))
return_error(-1, FBR_EINVAL);

fbr_ev_cond_var_init(FBR_A_ &ev, cond, mutex);
if (fbr_ev_wait_one_wto(FBR_A_ &ev.ev_base, timeout) == -1)
fbr_mutex_lock(FBR_A_ mutex);
return_success(0);
}

void fbr_cond_broadcast(FBR_P_ struct fbr_cond_var *cond)
{
struct fbr_id_tailq_i *item;
Expand Down