Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ymmt2005 committed Aug 16, 2024
1 parent 4c8d5c9 commit 2f84580
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
9 changes: 5 additions & 4 deletions cybozu/reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ class resource {
// This is intended to be called by non-reactor threads.
//
// This calls `f` only if this resource is still valid.
// If it is invalid, this returns `false`.
// If it is invalid, this returns `false`. Otherwise, the
// return value will be the return value of `f`.
//
// The template function `f` should return `true` if it wants to keep
// the resource valid. Otherwise, it should return `false`, then
// `with_fd` invalidates the resource.
// the resource valid. If it should returns `false`, then `with_fd`
// invalidates the resource.
template<typename Func>
bool with_fd(Func&& f) {
read_lock g(m_lock);
Expand All @@ -149,7 +150,7 @@ class resource {
}

invalidate_and_close();
return true;
return false;
}

private:
Expand Down
25 changes: 25 additions & 0 deletions cybozu/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#ifndef CYBOZU_TCP_HPP
#define CYBOZU_TCP_HPP

#include "dynbuf.hpp"
#include "ip_address.hpp"
#include "reactor.hpp"
#include "util.hpp"
Expand Down Expand Up @@ -170,6 +171,30 @@ class tcp_socket: public resource {
});
}

bool recv(dynbuf& buf) {
return with_fd([this, &buf](int fd) -> bool {
char* p = buf.prepare(4096);
ssize_t n = ::recv(fd, p, 4096, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
return true;
if( errno == EINTR )
return true;
if( errno == ECONNRESET ) {
buf.reset();
return invalidate();
}
throw_unix_error(errno, "recv");
}
if( n == 0 ) {
buf.reset();
return invalidate();
}
buf.consume(n);
return true;
});
}

protected:
// Write out pending data.
//
Expand Down
6 changes: 3 additions & 3 deletions src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ counter_socket::~counter_socket() {
release_all();
}

bool counter_socket::on_readable() {
bool counter_socket::on_readable(int fd) {
if( m_busy.load(std::memory_order_acquire) ) {
m_reactor->add_readable(*this);
return true;
Expand All @@ -112,11 +112,11 @@ bool counter_socket::on_readable() {
return true;
}

bool counter_socket::on_writable() {
bool counter_socket::on_writable(int fd) {
cybozu::worker* w = m_finder();
if( w == nullptr ) {
// if there is no idle worker, fallback to the default.
return cybozu::tcp_socket::on_writable();
return cybozu::tcp_socket::on_writable(fd);
}

w->post_job(m_sendjob);
Expand Down
8 changes: 4 additions & 4 deletions src/counter/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket {
std::unordered_map<const cybozu::hash_key*, std::uint32_t>
m_acquired_resources;

virtual void on_invalidate() override final {
virtual void on_invalidate(int fd) override final {
g_stats.curr_connections.fetch_sub(1);
cybozu::tcp_socket::on_invalidate();
cybozu::tcp_socket::on_invalidate(fd);
}

bool on_readable() override;
bool on_writable() override;
bool on_readable(int) override;
bool on_writable(int) override;

void cmd_get(const counter::request& cmd, counter::response& r);
void cmd_acquire(const counter::request& cmd, counter::response& r);
Expand Down

0 comments on commit 2f84580

Please sign in to comment.