Skip to content

Commit

Permalink
feat(server): support epoll linux api
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Oct 3, 2022
1 parent a1b800e commit ff586d5
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 46 deletions.
2 changes: 1 addition & 1 deletion helio
13 changes: 5 additions & 8 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include "util/tls/tls_socket.h"
#endif

#include "util/uring/uring_socket.h"

ABSL_FLAG(bool, tcp_nodelay, false,
"Configures dragonfly connections with socket option TCP_NODELAY");
ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console on main TCP port");
Expand Down Expand Up @@ -162,8 +160,8 @@ void Connection::OnShutdown() {
void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have.
if (break_poll_id_ != kuint32max) {
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
us->CancelPoll(break_poll_id_);
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
ls->CancelPoll(break_poll_id_);
break_poll_id_ = kuint32max;
}
}
Expand All @@ -173,9 +171,9 @@ void Connection::OnPostMigrateThread() {
if (breaker_cb_) {
DCHECK_EQ(kuint32max, break_poll_id_);

uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
ls->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
}
}

Expand Down Expand Up @@ -242,8 +240,7 @@ void Connection::HandleRequests() {
} else {
cc_.reset(service_->CreateContext(peer, this));

// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
auto* us = static_cast<LinuxSocketBase*>(socket_.get());
if (breaker_cb_) {
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib)

if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release")
# Add core2 only to this file, thus avoiding instructions in this object file that
Expand Down
82 changes: 60 additions & 22 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/epoll/epoll_pool.h"
#include "util/uring/uring_pool.h"
#include "util/varz.h"

Expand All @@ -52,6 +53,10 @@ ABSL_FLAG(string, unixsocket, "",
"If not empty - specifies path for the Unis socket that will "
"be used for listening for incoming connections.");

ABSL_FLAG(bool, force_epoll, false,
"If true - uses linux epoll engine underneath."
"Can fit for kernels older than 5.10.");

using namespace util;
using namespace facade;
using namespace io;
Expand Down Expand Up @@ -193,6 +198,42 @@ bool CreatePidFile(const string& path) {
return true;
}

bool ShouldUseEpollAPI(const base::sys::KernelVersion& kver) {
if (GetFlag(FLAGS_force_epoll))
return true;

if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(WARNING) << "Kernel is older than 5.10, switching to epoll engine.";
return true;
}

struct io_uring ring;
io_uring_params params;
memset(&params, 0, sizeof(params));

int iouring_res = io_uring_queue_init_params(1024, &ring, &params);

if (iouring_res == 0) {
io_uring_queue_exit(&ring);
return false;
}

iouring_res = -iouring_res;

if (iouring_res == ENOSYS) {
LOG(WARNING) << "iouring API is not supported. switching to epoll.";
} else if (iouring_res == ENOMEM) {
LOG(WARNING) << "io_uring does not have enough memory. That can happen when your "
"max locked memory is too limited. If you run via docker, "
"try adding '--ulimit memlock=-1' to \"docker run\" command."
"Meanwhile, switching to epoll";
} else {
LOG(WARNING) << "Weird error " << iouring_res << " switching to epoll";
}

return true;
}

} // namespace
} // namespace dfly

Expand Down Expand Up @@ -237,28 +278,11 @@ Usage: dragonfly [FLAGS]
CHECK_GT(GetFlag(FLAGS_port), 0u);
mi_stats_reset();

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(ERROR) << "Kernel 5.10 or later is supported. Exiting...";
return 1;
}

int iouring_res = io_uring_queue_init_params(0, nullptr, nullptr);
if (-iouring_res == ENOSYS) {
LOG(ERROR) << "iouring system call interface is not supported. Exiting...";
return 1;
}

if (GetFlag(FLAGS_dbnum) > dfly::kMaxDbId) {
LOG(ERROR) << "dbnum is too big. Exiting...";
return 1;
}

CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;

string pidfile_path = GetFlag(FLAGS_pidfile);
if (!pidfile_path.empty()) {
if (!CreatePidFile(pidfile_path)) {
Expand Down Expand Up @@ -287,14 +311,28 @@ Usage: dragonfly [FLAGS]
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);

uring::UringPool pp{1024};
pp.Run();
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;

unique_ptr<util::ProactorPool> pool;

bool use_epoll = ShouldUseEpollAPI(kver);
if (use_epoll) {
pool.reset(new epoll::EpollPool);
} else {
pool.reset(new uring::UringPool(1024)); // 1024 - iouring queue size.
}

pool->Run();

AcceptServer acceptor(&pp);
AcceptServer acceptor(pool.get());

int res = dfly::RunEngine(&pp, &acceptor) ? 0 : -1;
int res = dfly::RunEngine(pool.get(), &acceptor) ? 0 : -1;

pp.Stop();
pool->Stop();

if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str());
Expand Down
54 changes: 40 additions & 14 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern "C" {
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

using namespace std;
Expand All @@ -66,6 +67,7 @@ using absl::StrCat;
using namespace facade;
using strings::HumanReadableNumBytes;
using util::ProactorBase;
using util::fibers_ext::FiberQueueThreadPool;
using util::http::StringResponse;

namespace {
Expand Down Expand Up @@ -169,7 +171,8 @@ class LinuxWriteWrapper : public io::Sink {

class RdbSnapshot {
public:
RdbSnapshot() {}
RdbSnapshot(FiberQueueThreadPool* fq_tp) : fq_tp_(fq_tp) {
}

error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts);
void StartInShard(EngineShard* shard);
Expand All @@ -187,13 +190,12 @@ class RdbSnapshot {

private:
bool started_ = false;

FiberQueueThreadPool* fq_tp_;
std::unique_ptr<io::Sink> io_sink_;
std::unique_ptr<RdbSaver> saver_;
RdbTypeFreqMap freq_map_;
};


io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
Expand All @@ -203,15 +205,24 @@ io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
return res;
}

error_code RdbSnapshot::Start(bool sharded_snapshot,
const std::string& path, const StringVec& lua_scripts) {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
error_code RdbSnapshot::Start(bool sharded_snapshot, const std::string& path,
const StringVec& lua_scripts) {
bool is_direct = false;
if (fq_tp_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
return res.error();
io_sink_.reset(*res);
} else {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
}
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
}

io_sink_.reset(new LinuxWriteWrapper(res->release()));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, is_direct));

return saver_->SaveHeader(lua_scripts);
}
Expand All @@ -222,6 +233,9 @@ error_code RdbSnapshot::SaveBody() {

error_code RdbSnapshot::Close() {
// TODO: to solve it in a more elegant way.
if (fq_tp_) {
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
}
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
}

Expand Down Expand Up @@ -330,6 +344,9 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
dfly_cmd_.reset(new DflyCmd(main_listener, this));

pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
fq_threadpool_.reset(new FiberQueueThreadPool());
}

// Unlike EngineShard::Heartbeat that runs independently in each shard thread,
// this callback runs in a single thread and it aggregates globally stats from all the shards.
Expand Down Expand Up @@ -489,8 +506,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) {
}

error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;
io::ReadonlyFileOrError res;

if (fq_threadpool_) {
res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get());
} else {
res = uring::OpenRead(rdb_file);
}

if (res) {
io::FileSource fs(*res);
Expand Down Expand Up @@ -777,7 +800,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
abs_path += shard_file;

VLOG(1) << "Saving to " << abs_path;
snapshots[sid].reset(new RdbSnapshot);
snapshots[sid].reset(new RdbSnapshot(fq_threadpool_.get()));
auto& snapshot = snapshots[sid];
error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts);
if (local_ec) {
Expand All @@ -800,7 +823,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
path += filename;
VLOG(1) << "Saving to " << path;

snapshots[0].reset(new RdbSnapshot);
snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts);

if (!ec) {
Expand Down Expand Up @@ -1104,12 +1127,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
Metrics m = GetMetrics();

if (should_enter("SERVER")) {
ProactorBase::ProactorKind kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";

ADD_HEADER("# Server");

append("redis_version", GetVersion());
append("redis_mode", "standalone");
append("arch_bits", 64);
append("multiplexing_api", "iouring");
append("multiplexing_api", multiplex_api);
append("tcp_port", GetFlag(FLAGS_port));

size_t uptime = m.uptime;
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class ServerFamily {
std::atomic_bool is_saving_{false};

util::fibers_ext::Done is_snapshot_done_;
std::unique_ptr<util::fibers_ext::FiberQueueThreadPool> fq_threadpool_;
};

} // namespace dfly

0 comments on commit ff586d5

Please sign in to comment.