From f3acbe46df1bfe15170bf54a79df650dcb791b6a Mon Sep 17 00:00:00 2001 From: Cloud Wu Date: Mon, 12 Oct 2020 18:28:24 +0800 Subject: [PATCH] bugfix: pause will disable write event --- skynet-src/socket_epoll.h | 4 +-- skynet-src/socket_kqueue.h | 8 +++-- skynet-src/socket_poll.h | 2 +- skynet-src/socket_server.c | 65 +++++++++++++++++++------------------- 4 files changed, 41 insertions(+), 38 deletions(-) diff --git a/skynet-src/socket_epoll.h b/skynet-src/socket_epoll.h index 58a26afd5..6a3ae733d 100644 --- a/skynet-src/socket_epoll.h +++ b/skynet-src/socket_epoll.h @@ -42,9 +42,9 @@ sp_del(int efd, int sock) { } static void -sp_write(int efd, int sock, void *ud, bool enable) { +sp_enable(int efd, int sock, void *ud, bool read_enable, bool write_enable) { struct epoll_event ev; - ev.events = EPOLLIN | (enable ? EPOLLOUT : 0); + ev.events = (read_enable ? EPOLLIN : 0) | (write_enable ? EPOLLOUT : 0); ev.data.ptr = ud; epoll_ctl(efd, EPOLL_CTL_MOD, sock, &ev); } diff --git a/skynet-src/socket_kqueue.h b/skynet-src/socket_kqueue.h index cf5a59ede..dae8955f6 100644 --- a/skynet-src/socket_kqueue.h +++ b/skynet-src/socket_kqueue.h @@ -56,9 +56,13 @@ sp_add(int kfd, int sock, void *ud) { } static void -sp_write(int kfd, int sock, void *ud, bool enable) { +sp_write(int kfd, int sock, void *ud, bool read_enable, bool write_enable) { struct kevent ke; - EV_SET(&ke, sock, EVFILT_WRITE, enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud); + EV_SET(&ke, sock, EVFILT_READ, read_enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud); + if (kevent(kfd, &ke, 1, NULL, 0, NULL) == -1 || ke.flags & EV_ERROR) { + // todo: check error + } + EV_SET(&ke, sock, EVFILT_WRITE, write_enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud); if (kevent(kfd, &ke, 1, NULL, 0, NULL) == -1 || ke.flags & EV_ERROR) { // todo: check error } diff --git a/skynet-src/socket_poll.h b/skynet-src/socket_poll.h index 0a01f300c..785ae5b64 100644 --- a/skynet-src/socket_poll.h +++ b/skynet-src/socket_poll.h @@ -18,7 +18,7 @@ static poll_fd sp_create(); static void sp_release(poll_fd fd); static int sp_add(poll_fd fd, int sock, void *ud); static void sp_del(poll_fd fd, int sock); -static void sp_write(poll_fd, int sock, void *ud, bool enable); +static void sp_enable(poll_fd, int sock, void *ud, bool read_enable, bool write_enable); static int sp_wait(poll_fd, struct event *e, int max); static void sp_nonblocking(int sock); diff --git a/skynet-src/socket_server.c b/skynet-src/socket_server.c index 7d2aa738a..998413ff6 100644 --- a/skynet-src/socket_server.c +++ b/skynet-src/socket_server.c @@ -37,10 +37,6 @@ #define PRIORITY_HIGH 0 #define PRIORITY_LOW 1 -#define READING_PAUSE 0 -#define READING_RESUME 1 -#define READING_CLOSE 2 - #define HASH_ID(id) (((unsigned)id) % MAX_SOCKET) #define ID_TAG16(id) ((id>>MAX_SOCKET_P) & 0xffff) @@ -99,7 +95,8 @@ struct socket { int id; uint8_t protocol; uint8_t type; - uint8_t reading; + bool reading; + bool writing; int udpconnecting; int64_t warn_size; union { @@ -482,10 +479,7 @@ force_close(struct socket_server *ss, struct socket *s, struct socket_lock *l, s assert(s->type != SOCKET_TYPE_RESERVE); free_wb_list(ss,&s->high); free_wb_list(ss,&s->low); - if (s->reading == READING_RESUME) { - sp_del(ss->event_fd, s->fd); - } - s->reading = READING_CLOSE; + sp_del(ss->event_fd, s->fd); socket_lock(l); if (s->type != SOCKET_TYPE_BIND) { if (close(s->fd) < 0) { @@ -531,20 +525,19 @@ check_wb_list(struct wb_list *s) { } static struct socket * -new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool add) { +new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool reading) { struct socket * s = &ss->slot[HASH_ID(id)]; assert(s->type == SOCKET_TYPE_RESERVE); - if (add) { - if (sp_add(ss->event_fd, fd, s)) { - s->type = SOCKET_TYPE_INVALID; - return NULL; - } + if (sp_add(ss->event_fd, fd, s)) { + s->type = SOCKET_TYPE_INVALID; + return NULL; } s->id = id; s->fd = fd; - s->reading = add ? READING_RESUME : READING_PAUSE; + s->reading = reading; + s->writing = false; s->sending = ID_TAG16(id) << 16 | 0; s->protocol = protocol; s->p.size = MIN_READ_BUFFER; @@ -559,6 +552,22 @@ new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, return s; } +static inline void +enable_write(struct socket_server *ss, struct socket *s, bool enable) { + if (s->writing != enable) { + s->writing = enable; + sp_enable(ss->event_fd, s->fd, s, s->reading, enable); + } +} + +static inline void +enable_read(struct socket_server *ss, struct socket *s, bool enable) { + if (s->reading != enable) { + s->reading = enable; + sp_enable(ss->event_fd, s->fd, s, enable, s->writing); + } +} + static inline void stat_read(struct socket_server *ss, struct socket *s, int n) { s->stat.read += n; @@ -636,7 +645,7 @@ open_socket(struct socket_server *ss, struct request_open * request, struct sock return SOCKET_OPEN; } else { ns->type = SOCKET_TYPE_CONNECTING; - sp_write(ss->event_fd, ns->fd, ns, true); + enable_write(ss, ns, true); } freeaddrinfo( ai_list ); @@ -817,7 +826,7 @@ send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, } // step 4 assert(send_buffer_empty(s) && s->wb_size == 0); - sp_write(ss->event_fd, s->fd, s, false); + enable_write(ss, s, false); if (s->type == SOCKET_TYPE_HALFCLOSE) { force_close(ss, s, l, result); @@ -954,7 +963,7 @@ send_socket(struct socket_server *ss, struct request_send * request, struct sock return -1; } } - sp_write(ss->event_fd, s->fd, s, true); + enable_write(ss, s, true); } else { if (s->protocol == PROTOCOL_TCP) { if (priority == PRIORITY_LOW) { @@ -1067,14 +1076,7 @@ resume_socket(struct socket_server *ss, struct request_resumepause *request, str } struct socket_lock l; socket_lock_init(s, &l); - if (s->reading == READING_PAUSE) { - if (sp_add(ss->event_fd, s->fd, s)) { - force_close(ss, s, &l, result); - result->data = strerror(errno); - return SOCKET_ERR; - } - s->reading = READING_RESUME; - } + enable_read(ss, s, true); if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) { s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; s->opaque = request->opaque; @@ -1097,10 +1099,7 @@ pause_socket(struct socket_server *ss, struct request_resumepause *request) { if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return; } - if (s->reading == READING_RESUME) { - sp_del(ss->event_fd, s->fd); - s->reading = READING_PAUSE; - } + enable_read(ss, s, false); } static void @@ -1408,7 +1407,7 @@ report_connect(struct socket_server *ss, struct socket *s, struct socket_lock *l result->id = s->id; result->ud = 0; if (nomore_sending_data(s)) { - sp_write(ss->event_fd, s->fd, s, false); + enable_write(ss, s, false); } union sockaddr_all u; socklen_t slen = sizeof(u); @@ -1709,7 +1708,7 @@ socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) { s->dw_buffer = clone_buffer(buf, &s->dw_size); s->dw_offset = n; - sp_write(ss->event_fd, s->fd, s, true); + enable_write(ss, s, true); socket_unlock(&l); return 0;