From 2f84580fa42e53e7bd199e3f5798da55f8cd9f1d Mon Sep 17 00:00:00 2001 From: ymmt Date: Fri, 16 Aug 2024 00:03:01 +0000 Subject: [PATCH] wip --- cybozu/reactor.hpp | 9 +++++---- cybozu/tcp.hpp | 25 +++++++++++++++++++++++++ src/counter/sockets.cpp | 6 +++--- src/counter/sockets.hpp | 8 ++++---- 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/cybozu/reactor.hpp b/cybozu/reactor.hpp index 57084d8..0c99a8f 100644 --- a/cybozu/reactor.hpp +++ b/cybozu/reactor.hpp @@ -133,11 +133,12 @@ class resource { // This is intended to be called by non-reactor threads. // // This calls `f` only if this resource is still valid. - // If it is invalid, this returns `false`. + // If it is invalid, this returns `false`. Otherwise, the + // return value will be the return value of `f`. // // The template function `f` should return `true` if it wants to keep - // the resource valid. Otherwise, it should return `false`, then - // `with_fd` invalidates the resource. + // the resource valid. If it should returns `false`, then `with_fd` + // invalidates the resource. template bool with_fd(Func&& f) { read_lock g(m_lock); @@ -149,7 +150,7 @@ class resource { } invalidate_and_close(); - return true; + return false; } private: diff --git a/cybozu/tcp.hpp b/cybozu/tcp.hpp index 23022d9..126a581 100644 --- a/cybozu/tcp.hpp +++ b/cybozu/tcp.hpp @@ -4,6 +4,7 @@ #ifndef CYBOZU_TCP_HPP #define CYBOZU_TCP_HPP +#include "dynbuf.hpp" #include "ip_address.hpp" #include "reactor.hpp" #include "util.hpp" @@ -170,6 +171,30 @@ class tcp_socket: public resource { }); } + bool recv(dynbuf& buf) { + return with_fd([this, &buf](int fd) -> bool { + char* p = buf.prepare(4096); + ssize_t n = ::recv(fd, p, 4096, 0); + if( n == -1 ) { + if( errno == EAGAIN || errno == EWOULDBLOCK ) + return true; + if( errno == EINTR ) + return true; + if( errno == ECONNRESET ) { + buf.reset(); + return invalidate(); + } + throw_unix_error(errno, "recv"); + } + if( n == 0 ) { + buf.reset(); + return invalidate(); + } + buf.consume(n); + return true; + }); + } + protected: // Write out pending data. // diff --git a/src/counter/sockets.cpp b/src/counter/sockets.cpp index e5cb8a1..d9153bc 100644 --- a/src/counter/sockets.cpp +++ b/src/counter/sockets.cpp @@ -94,7 +94,7 @@ counter_socket::~counter_socket() { release_all(); } -bool counter_socket::on_readable() { +bool counter_socket::on_readable(int fd) { if( m_busy.load(std::memory_order_acquire) ) { m_reactor->add_readable(*this); return true; @@ -112,11 +112,11 @@ bool counter_socket::on_readable() { return true; } -bool counter_socket::on_writable() { +bool counter_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); diff --git a/src/counter/sockets.hpp b/src/counter/sockets.hpp index 0c56ae3..116e2fd 100644 --- a/src/counter/sockets.hpp +++ b/src/counter/sockets.hpp @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket { std::unordered_map m_acquired_resources; - virtual void on_invalidate() override final { + virtual void on_invalidate(int fd) override final { g_stats.curr_connections.fetch_sub(1); - cybozu::tcp_socket::on_invalidate(); + cybozu::tcp_socket::on_invalidate(fd); } - bool on_readable() override; - bool on_writable() override; + bool on_readable(int) override; + bool on_writable(int) override; void cmd_get(const counter::request& cmd, counter::response& r); void cmd_acquire(const counter::request& cmd, counter::response& r);