Skip to content

Commit

Permalink
feat(server): support epoll linux api
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Oct 3, 2022
1 parent a1b800e commit 4d5c18d
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ If applicable, add screenshots to help explain your problem.

**Environment (please complete the following information):**
- OS: [ubuntu 20.04]
- Kernel: [MUST BE 5.10 or greater] # Command: `uname -a`
- Kernel: # Command: `uname -a`
- Containerized?: [Bare Metal, Docker, Docker Compose, Docker Swarm, Kubernetes, Other]
- Dragonfly Version: [e.g. 0.3.0]

Expand Down
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ For more info about memory efficiency in Dragonfly see [dashtable doc](/docs/das

## Running the server

Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing)
for I/O, hence it requires Linux version 5.10 or later.
Debian/Bullseye, Ubuntu 20.04.4 or later fit these requirements.
Dragonfly runs on linux. We advice running it on linux version 5.11 or later
but you can also run Dragonfly on older kernels as well.


### With docker:
Expand Down Expand Up @@ -141,9 +140,9 @@ In addition, it has Dragonfly specific arguments options:
`keys` is a dangerous command. We truncate its result to avoid blowup in memory when fetching too many keys.
* `dbnum` - maximum number of supported databases for `select`.
* `cache_mode` - see [Cache](#novel-cache-design) section below.
* `hz` - key expiry evaluation frequency. Default is 1000. Lower frequency uses less cpu when
* `hz` - key expiry evaluation frequency. Default is 1000. Lower frequency uses less cpu when
idle at the expense of precision in key eviction.
* `save_schedule` - glob spec for the UTC time to save a snapshot which matches HH:MM (24h time). default: ""
* `save_schedule` - glob spec for the UTC time to save a snapshot which matches HH:MM (24h time). default: ""
* `keys_output_limit` - Maximum number of keys output by keys command. default: 8192


Expand Down
7 changes: 3 additions & 4 deletions docs/build-from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

## Running the server

Dragonfly runs on linux. It uses relatively new linux specific [io-uring API](https://github.com/axboe/liburing)
for I/O, hence it requires `Linux verion 5.10` or later.
Debian/Bullseye, `Ubuntu 20.04.4` or later fit these requirements.
Dragonfly runs on linux. We advice running it on linux version 5.11 or later
but you can also run Dragonfly on older kernels as well.

### WARNING: Building from source on older kernels WILL NOT WORK.

Expand Down Expand Up @@ -58,7 +57,7 @@ OK
1) "hello"
127.0.0.1:6379> get hello
"world"
127.0.0.1:6379>
127.0.0.1:6379>
```

## Step 6
Expand Down
6 changes: 1 addition & 5 deletions docs/quick-start/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ OK
1) "hello"
127.0.0.1:6379> get hello
"world"
127.0.0.1:6379>
127.0.0.1:6379>
```

## Step 3
Expand All @@ -46,10 +46,6 @@ Continue being great and build your app with the power of DragonflyDB!

## Known issues

#### `Error initializing io_uring`

This likely means your kernel version is too low to run DragonflyDB. Make sure to install
a kernel version that supports `io_uring`.

## More Build Options
- [Docker Compose Deployment](/contrib/docker/)
Expand Down
2 changes: 1 addition & 1 deletion helio
13 changes: 5 additions & 8 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include "util/tls/tls_socket.h"
#endif

#include "util/uring/uring_socket.h"

ABSL_FLAG(bool, tcp_nodelay, false,
"Configures dragonfly connections with socket option TCP_NODELAY");
ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console on main TCP port");
Expand Down Expand Up @@ -162,8 +160,8 @@ void Connection::OnShutdown() {
void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have.
if (break_poll_id_ != kuint32max) {
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
us->CancelPoll(break_poll_id_);
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
ls->CancelPoll(break_poll_id_);
break_poll_id_ = kuint32max;
}
}
Expand All @@ -173,9 +171,9 @@ void Connection::OnPostMigrateThread() {
if (breaker_cb_) {
DCHECK_EQ(kuint32max, break_poll_id_);

uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
ls->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
}
}

Expand Down Expand Up @@ -242,8 +240,7 @@ void Connection::HandleRequests() {
} else {
cc_.reset(service_->CreateContext(peer, this));

// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
auto* us = static_cast<LinuxSocketBase*>(socket_.get());
if (breaker_cb_) {
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib)

if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release")
# Add core2 only to this file, thus avoiding instructions in this object file that
Expand Down
82 changes: 60 additions & 22 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/epoll/epoll_pool.h"
#include "util/uring/uring_pool.h"
#include "util/varz.h"

Expand All @@ -52,6 +53,10 @@ ABSL_FLAG(string, unixsocket, "",
"If not empty - specifies path for the Unis socket that will "
"be used for listening for incoming connections.");

ABSL_FLAG(bool, force_epoll, false,
"If true - uses linux epoll engine underneath."
"Can fit for kernels older than 5.10.");

using namespace util;
using namespace facade;
using namespace io;
Expand Down Expand Up @@ -193,6 +198,42 @@ bool CreatePidFile(const string& path) {
return true;
}

bool ShouldUseEpollAPI(const base::sys::KernelVersion& kver) {
if (GetFlag(FLAGS_force_epoll))
return true;

if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(WARNING) << "Kernel is older than 5.10, switching to epoll engine.";
return true;
}

struct io_uring ring;
io_uring_params params;
memset(&params, 0, sizeof(params));

int iouring_res = io_uring_queue_init_params(1024, &ring, &params);

if (iouring_res == 0) {
io_uring_queue_exit(&ring);
return false;
}

iouring_res = -iouring_res;

if (iouring_res == ENOSYS) {
LOG(WARNING) << "iouring API is not supported. switching to epoll.";
} else if (iouring_res == ENOMEM) {
LOG(WARNING) << "io_uring does not have enough memory. That can happen when your "
"max locked memory is too limited. If you run via docker, "
"try adding '--ulimit memlock=-1' to \"docker run\" command."
"Meanwhile, switching to epoll";
} else {
LOG(WARNING) << "Weird error " << iouring_res << " switching to epoll";
}

return true;
}

} // namespace
} // namespace dfly

Expand Down Expand Up @@ -237,28 +278,11 @@ Usage: dragonfly [FLAGS]
CHECK_GT(GetFlag(FLAGS_port), 0u);
mi_stats_reset();

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

if (kver.kernel < 5 || (kver.kernel == 5 && kver.major < 10)) {
LOG(ERROR) << "Kernel 5.10 or later is supported. Exiting...";
return 1;
}

int iouring_res = io_uring_queue_init_params(0, nullptr, nullptr);
if (-iouring_res == ENOSYS) {
LOG(ERROR) << "iouring system call interface is not supported. Exiting...";
return 1;
}

if (GetFlag(FLAGS_dbnum) > dfly::kMaxDbId) {
LOG(ERROR) << "dbnum is too big. Exiting...";
return 1;
}

CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;

string pidfile_path = GetFlag(FLAGS_pidfile);
if (!pidfile_path.empty()) {
if (!CreatePidFile(pidfile_path)) {
Expand Down Expand Up @@ -287,14 +311,28 @@ Usage: dragonfly [FLAGS]
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);

uring::UringPool pp{1024};
pp.Run();
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;

unique_ptr<util::ProactorPool> pool;

bool use_epoll = ShouldUseEpollAPI(kver);
if (use_epoll) {
pool.reset(new epoll::EpollPool);
} else {
pool.reset(new uring::UringPool(1024)); // 1024 - iouring queue size.
}

pool->Run();

AcceptServer acceptor(&pp);
AcceptServer acceptor(pool.get());

int res = dfly::RunEngine(&pp, &acceptor) ? 0 : -1;
int res = dfly::RunEngine(pool.get(), &acceptor) ? 0 : -1;

pp.Stop();
pool->Stop();

if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str());
Expand Down
54 changes: 40 additions & 14 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern "C" {
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

using namespace std;
Expand All @@ -66,6 +67,7 @@ using absl::StrCat;
using namespace facade;
using strings::HumanReadableNumBytes;
using util::ProactorBase;
using util::fibers_ext::FiberQueueThreadPool;
using util::http::StringResponse;

namespace {
Expand Down Expand Up @@ -169,7 +171,8 @@ class LinuxWriteWrapper : public io::Sink {

class RdbSnapshot {
public:
RdbSnapshot() {}
RdbSnapshot(FiberQueueThreadPool* fq_tp) : fq_tp_(fq_tp) {
}

error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts);
void StartInShard(EngineShard* shard);
Expand All @@ -187,13 +190,12 @@ class RdbSnapshot {

private:
bool started_ = false;

FiberQueueThreadPool* fq_tp_;
std::unique_ptr<io::Sink> io_sink_;
std::unique_ptr<RdbSaver> saver_;
RdbTypeFreqMap freq_map_;
};


io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
Expand All @@ -203,15 +205,24 @@ io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
return res;
}

error_code RdbSnapshot::Start(bool sharded_snapshot,
const std::string& path, const StringVec& lua_scripts) {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
error_code RdbSnapshot::Start(bool sharded_snapshot, const std::string& path,
const StringVec& lua_scripts) {
bool is_direct = false;
if (fq_tp_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
return res.error();
io_sink_.reset(*res);
} else {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
}
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
}

io_sink_.reset(new LinuxWriteWrapper(res->release()));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, is_direct));

return saver_->SaveHeader(lua_scripts);
}
Expand All @@ -222,6 +233,9 @@ error_code RdbSnapshot::SaveBody() {

error_code RdbSnapshot::Close() {
// TODO: to solve it in a more elegant way.
if (fq_tp_) {
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
}
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
}

Expand Down Expand Up @@ -330,6 +344,9 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
dfly_cmd_.reset(new DflyCmd(main_listener, this));

pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
fq_threadpool_.reset(new FiberQueueThreadPool());
}

// Unlike EngineShard::Heartbeat that runs independently in each shard thread,
// this callback runs in a single thread and it aggregates globally stats from all the shards.
Expand Down Expand Up @@ -489,8 +506,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) {
}

error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;
io::ReadonlyFileOrError res;

if (fq_threadpool_) {
res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get());
} else {
res = uring::OpenRead(rdb_file);
}

if (res) {
io::FileSource fs(*res);
Expand Down Expand Up @@ -777,7 +800,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
abs_path += shard_file;

VLOG(1) << "Saving to " << abs_path;
snapshots[sid].reset(new RdbSnapshot);
snapshots[sid].reset(new RdbSnapshot(fq_threadpool_.get()));
auto& snapshot = snapshots[sid];
error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts);
if (local_ec) {
Expand All @@ -800,7 +823,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
path += filename;
VLOG(1) << "Saving to " << path;

snapshots[0].reset(new RdbSnapshot);
snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts);

if (!ec) {
Expand Down Expand Up @@ -1104,12 +1127,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
Metrics m = GetMetrics();

if (should_enter("SERVER")) {
ProactorBase::ProactorKind kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";

ADD_HEADER("# Server");

append("redis_version", GetVersion());
append("redis_mode", "standalone");
append("arch_bits", 64);
append("multiplexing_api", "iouring");
append("multiplexing_api", multiplex_api);
append("tcp_port", GetFlag(FLAGS_port));

size_t uptime = m.uptime;
Expand Down
Loading

0 comments on commit 4d5c18d

Please sign in to comment.