Skip to content

Commit

Permalink
bugfix: pause will disable write event
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwu committed Oct 12, 2020
1 parent 471c7d1 commit f3acbe4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 38 deletions.
4 changes: 2 additions & 2 deletions skynet-src/socket_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ sp_del(int efd, int sock) {
}

static void
sp_write(int efd, int sock, void *ud, bool enable) {
sp_enable(int efd, int sock, void *ud, bool read_enable, bool write_enable) {
struct epoll_event ev;
ev.events = EPOLLIN | (enable ? EPOLLOUT : 0);
ev.events = (read_enable ? EPOLLIN : 0) | (write_enable ? EPOLLOUT : 0);
ev.data.ptr = ud;
epoll_ctl(efd, EPOLL_CTL_MOD, sock, &ev);
}
Expand Down
8 changes: 6 additions & 2 deletions skynet-src/socket_kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ sp_add(int kfd, int sock, void *ud) {
}

static void
sp_write(int kfd, int sock, void *ud, bool enable) {
sp_write(int kfd, int sock, void *ud, bool read_enable, bool write_enable) {

This comment has been minimized.

Copy link
@hanxi

hanxi Oct 13, 2020

Contributor

maybe sp_enable

struct kevent ke;
EV_SET(&ke, sock, EVFILT_WRITE, enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud);
EV_SET(&ke, sock, EVFILT_READ, read_enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud);
if (kevent(kfd, &ke, 1, NULL, 0, NULL) == -1 || ke.flags & EV_ERROR) {
// todo: check error
}
EV_SET(&ke, sock, EVFILT_WRITE, write_enable ? EV_ENABLE : EV_DISABLE, 0, 0, ud);
if (kevent(kfd, &ke, 1, NULL, 0, NULL) == -1 || ke.flags & EV_ERROR) {
// todo: check error
}
Expand Down
2 changes: 1 addition & 1 deletion skynet-src/socket_poll.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ static poll_fd sp_create();
static void sp_release(poll_fd fd);
static int sp_add(poll_fd fd, int sock, void *ud);
static void sp_del(poll_fd fd, int sock);
static void sp_write(poll_fd, int sock, void *ud, bool enable);
static void sp_enable(poll_fd, int sock, void *ud, bool read_enable, bool write_enable);
static int sp_wait(poll_fd, struct event *e, int max);
static void sp_nonblocking(int sock);

Expand Down
65 changes: 32 additions & 33 deletions skynet-src/socket_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
#define PRIORITY_HIGH 0
#define PRIORITY_LOW 1

#define READING_PAUSE 0
#define READING_RESUME 1
#define READING_CLOSE 2

#define HASH_ID(id) (((unsigned)id) % MAX_SOCKET)
#define ID_TAG16(id) ((id>>MAX_SOCKET_P) & 0xffff)

Expand Down Expand Up @@ -99,7 +95,8 @@ struct socket {
int id;
uint8_t protocol;
uint8_t type;
uint8_t reading;
bool reading;
bool writing;
int udpconnecting;
int64_t warn_size;
union {
Expand Down Expand Up @@ -482,10 +479,7 @@ force_close(struct socket_server *ss, struct socket *s, struct socket_lock *l, s
assert(s->type != SOCKET_TYPE_RESERVE);
free_wb_list(ss,&s->high);
free_wb_list(ss,&s->low);
if (s->reading == READING_RESUME) {
sp_del(ss->event_fd, s->fd);
}
s->reading = READING_CLOSE;
sp_del(ss->event_fd, s->fd);
socket_lock(l);
if (s->type != SOCKET_TYPE_BIND) {
if (close(s->fd) < 0) {
Expand Down Expand Up @@ -531,20 +525,19 @@ check_wb_list(struct wb_list *s) {
}

static struct socket *
new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool add) {
new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool reading) {
struct socket * s = &ss->slot[HASH_ID(id)];
assert(s->type == SOCKET_TYPE_RESERVE);

if (add) {
if (sp_add(ss->event_fd, fd, s)) {
s->type = SOCKET_TYPE_INVALID;
return NULL;
}
if (sp_add(ss->event_fd, fd, s)) {
s->type = SOCKET_TYPE_INVALID;
return NULL;
}

s->id = id;
s->fd = fd;
s->reading = add ? READING_RESUME : READING_PAUSE;
s->reading = reading;
s->writing = false;
s->sending = ID_TAG16(id) << 16 | 0;
s->protocol = protocol;
s->p.size = MIN_READ_BUFFER;
Expand All @@ -559,6 +552,22 @@ new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque,
return s;
}

static inline void
enable_write(struct socket_server *ss, struct socket *s, bool enable) {
if (s->writing != enable) {
s->writing = enable;
sp_enable(ss->event_fd, s->fd, s, s->reading, enable);
}
}

static inline void
enable_read(struct socket_server *ss, struct socket *s, bool enable) {
if (s->reading != enable) {
s->reading = enable;
sp_enable(ss->event_fd, s->fd, s, enable, s->writing);
}
}

static inline void
stat_read(struct socket_server *ss, struct socket *s, int n) {
s->stat.read += n;
Expand Down Expand Up @@ -636,7 +645,7 @@ open_socket(struct socket_server *ss, struct request_open * request, struct sock
return SOCKET_OPEN;
} else {
ns->type = SOCKET_TYPE_CONNECTING;
sp_write(ss->event_fd, ns->fd, ns, true);
enable_write(ss, ns, true);
}

freeaddrinfo( ai_list );
Expand Down Expand Up @@ -817,7 +826,7 @@ send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l,
}
// step 4
assert(send_buffer_empty(s) && s->wb_size == 0);
sp_write(ss->event_fd, s->fd, s, false);
enable_write(ss, s, false);

if (s->type == SOCKET_TYPE_HALFCLOSE) {
force_close(ss, s, l, result);
Expand Down Expand Up @@ -954,7 +963,7 @@ send_socket(struct socket_server *ss, struct request_send * request, struct sock
return -1;
}
}
sp_write(ss->event_fd, s->fd, s, true);
enable_write(ss, s, true);
} else {
if (s->protocol == PROTOCOL_TCP) {
if (priority == PRIORITY_LOW) {
Expand Down Expand Up @@ -1067,14 +1076,7 @@ resume_socket(struct socket_server *ss, struct request_resumepause *request, str
}
struct socket_lock l;
socket_lock_init(s, &l);
if (s->reading == READING_PAUSE) {
if (sp_add(ss->event_fd, s->fd, s)) {
force_close(ss, s, &l, result);
result->data = strerror(errno);
return SOCKET_ERR;
}
s->reading = READING_RESUME;
}
enable_read(ss, s, true);
if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
s->opaque = request->opaque;
Expand All @@ -1097,10 +1099,7 @@ pause_socket(struct socket_server *ss, struct request_resumepause *request) {
if (s->type == SOCKET_TYPE_INVALID || s->id !=id) {
return;
}
if (s->reading == READING_RESUME) {
sp_del(ss->event_fd, s->fd);
s->reading = READING_PAUSE;
}
enable_read(ss, s, false);
}

static void
Expand Down Expand Up @@ -1408,7 +1407,7 @@ report_connect(struct socket_server *ss, struct socket *s, struct socket_lock *l
result->id = s->id;
result->ud = 0;
if (nomore_sending_data(s)) {
sp_write(ss->event_fd, s->fd, s, false);
enable_write(ss, s, false);
}
union sockaddr_all u;
socklen_t slen = sizeof(u);
Expand Down Expand Up @@ -1709,7 +1708,7 @@ socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
s->dw_buffer = clone_buffer(buf, &s->dw_size);
s->dw_offset = n;

sp_write(ss->event_fd, s->fd, s, true);
enable_write(ss, s, true);

socket_unlock(&l);
return 0;
Expand Down

0 comments on commit f3acbe4

Please sign in to comment.