Skip to content

Commit

Permalink
Use rb_io_wait if available and remove Windows specific wait functions
Browse files Browse the repository at this point in the history
Fall back to rb_wait_for_single_fd() on ruby < 3.0, which works on Windows as well.
  • Loading branch information
larskanis committed Sep 20, 2021
1 parent 63e00c2 commit 6c885e8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 138 deletions.
145 changes: 31 additions & 114 deletions ext/pg_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -2289,103 +2289,43 @@ pgconn_notifies(VALUE self)
return hash;
}

/* Win32 + Ruby 1.9+ */
#if defined( _WIN32 )
/*
* On Windows, use platform-specific strategies to wait for the socket
* instead of rb_wait_for_single_fd().
*/

int rb_w32_wait_events( HANDLE *events, int num, DWORD timeout );

static void *
wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readable)(PGconn *) )
{
int sd = PQsocket( conn );
void *retval;
struct timeval aborttime={0,0}, currtime, waittime;
DWORD timeout_milisec = INFINITE;
DWORD wait_ret;
WSAEVENT hEvent;

if ( sd < 0 )
rb_raise(rb_eConnectionBad, "PQsocket() can't get socket descriptor");

hEvent = WSACreateEvent();

/* Check for connection errors (PQisBusy is true on connection errors) */
if( PQconsumeInput(conn) == 0 ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );
}

if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timeradd(&currtime, ptimeout, &aborttime);
}

while ( !(retval=is_readable(conn)) ) {
if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "WSAEventSelect socket error: %d", WSAGetLastError() );
}

if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timersub(&aborttime, &currtime, &waittime);
timeout_milisec = (DWORD)( waittime.tv_sec * 1e3 + waittime.tv_usec / 1e3 );
}

/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* Wait for the socket to become readable before checking again */
wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
} else {
wait_ret = WAIT_TIMEOUT;
}
#if !defined(HAVE_RB_IO_WAIT)

if ( wait_ret == WAIT_TIMEOUT ) {
WSACloseEvent( hEvent );
return NULL;
} else if ( wait_ret == WAIT_OBJECT_0 ) {
/* The event we were waiting for. */
} else if ( wait_ret == WAIT_OBJECT_0 + 1) {
/* This indicates interruption from timer thread, GC, exception
* from other threads etc... */
rb_thread_check_ints();
} else if ( wait_ret == WAIT_FAILED ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "Wait on socket error (WaitForMultipleObjects): %lu", GetLastError() );
} else {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "Wait on socket abandoned (WaitForMultipleObjects)" );
}
typedef enum {
RUBY_IO_READABLE = RB_WAITFD_IN,
RUBY_IO_WRITABLE = RB_WAITFD_OUT,
RUBY_IO_PRIORITY = RB_WAITFD_PRI,
} rb_io_event_t;

/* Check for connection errors (PQisBusy is true on connection errors) */
if ( PQconsumeInput(conn) == 0 ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );
}
static VALUE
rb_io_wait(VALUE io, VALUE events, VALUE timeout) {
rb_io_t *fptr;
struct timeval waittime;
int res;

GetOpenFile((io), fptr);
if( !NIL_P(timeout) ){
waittime.tv_sec = (time_t)(NUM2DBL(timeout));
waittime.tv_usec = (time_t)(NUM2DBL(timeout) - (double)waittime.tv_sec);
}
res = rb_wait_for_single_fd(fptr->fd, NUM2UINT(events), NIL_P(timeout) ? NULL : &waittime);

WSACloseEvent( hEvent );
return retval;
return UINT2NUM(res);
}

#else

/* non Win32 */
#endif

static void *
wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readable)(PGconn *))
wait_socket_readable( VALUE self, struct timeval *ptimeout, void *(*is_readable)(PGconn *))
{
int sd = PQsocket( conn );
int ret;
VALUE socket_io;
VALUE ret;
void *retval;
struct timeval aborttime={0,0}, currtime, waittime;
VALUE wait_timeout = Qnil;
PGconn *conn = pg_get_pgconn(self);

if ( sd < 0 )
rb_raise(rb_eConnectionBad, "PQsocket() can't get socket descriptor");
socket_io = pgconn_socket_io(self);

/* Check for connection errors (PQisBusy is true on connection errors) */
if ( PQconsumeInput(conn) == 0 )
Expand All @@ -2400,22 +2340,19 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timersub(&aborttime, &currtime, &waittime);
wait_timeout = DBL2NUM((double)(waittime.tv_sec) + (double)(waittime.tv_usec) / 1000000.0);
}

/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* Wait for the socket to become readable before checking again */
ret = rb_wait_for_single_fd( sd, RB_WAITFD_IN, ptimeout ? &waittime : NULL );
ret = rb_io_wait(socket_io, RB_INT2NUM(RUBY_IO_READABLE), wait_timeout);
} else {
ret = 0;
}

if ( ret < 0 ){
rb_sys_fail( "rb_wait_for_single_fd()" );
ret = Qfalse;
}

/* Return false if the select() timed out */
if ( ret == 0 ){
if ( ret == Qfalse ){
return NULL;
}

Expand All @@ -2429,8 +2366,6 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
}


#endif

static void *
notify_readable(PGconn *conn)
{
Expand Down Expand Up @@ -2468,7 +2403,7 @@ pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
ptimeout = &timeout;
}

pnotification = (PGnotify*) wait_socket_readable( this->pgconn, ptimeout, notify_readable);
pnotification = (PGnotify*) wait_socket_readable( self, ptimeout, notify_readable);

/* Return nil if the select timed out */
if ( !pnotification ) return Qnil;
Expand Down Expand Up @@ -3050,7 +2985,7 @@ pgconn_block( int argc, VALUE *argv, VALUE self ) {
ptimeout = &timeout;
}

ret = wait_socket_readable( conn, ptimeout, get_result_readable);
ret = wait_socket_readable( self, ptimeout, get_result_readable);

if( !ret )
return Qfalse;
Expand Down Expand Up @@ -3101,24 +3036,6 @@ pgconn_get_last_result(VALUE self)
return rb_pgresult;
}

#if !defined(HAVE_RB_IO_WAIT)

typedef enum {
RUBY_IO_READABLE = RB_WAITFD_IN,
RUBY_IO_WRITABLE = RB_WAITFD_OUT,
RUBY_IO_PRIORITY = RB_WAITFD_PRI,
} rb_io_event_t;

#define rb_io_wait(io, event, timeout) do { \
rb_io_t *fptr; \
GetOpenFile((io), fptr); \
if( (event) == RB_INT2NUM(RUBY_IO_READABLE) ) \
rb_io_wait_readable(fptr->fd); \
else \
rb_io_wait_writable(fptr->fd); \
} while(0)
#endif

/*
* call-seq:
* conn.discard_results()
Expand Down
24 changes: 0 additions & 24 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,30 +308,6 @@ def ssl_attributes
end
end

if RUBY_PLATFORM =~ /mingw|mswin/
def block(timeout=nil)
# pgconn_block in the C ext isn't Fiber.scheduler compatible on Windows.
# So use ruby code and socket_io.wait_readable instead.

# Buffer any incoming data on the socket until a full result is ready.
consume_input

if timeout
aborttime = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
end

while is_busy
tm = timeout &&
[0.0, aborttime - Process.clock_gettime(Process::CLOCK_MONOTONIC)].max
unless socket_io.wait_readable(tm)
return false
end
consume_input
end
return true
end
end

def async_exec(*args)
discard_results
async_send_query(*args)
Expand Down

0 comments on commit 6c885e8

Please sign in to comment.