Skip to content

Commit

Permalink
chore: preparation for basic http api
Browse files Browse the repository at this point in the history
The goal is to provide very basic support for simple commands,
fancy stuff like pipelining, blocking commands won't work.

1. Added optional registration for /api handler.
2. Implemented parsing of post body.
3. Added basic formatting routine for the response. It does not cover all the commands but should suffice for
   basic usage.

The API is a POST method and the body of the request should contain command arguments formatted as json array.
For example, `'["set", "foo", "bar", "ex", "100"]'`.
The response is a json object with either `result` field holding the response of the command or
`error` field containing the error message sent by the server.
See `test_http` test in tests/dragonfly/connection_test.py for more details.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Mar 24, 2024
1 parent 3aa4a29 commit b7d996b
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 32 deletions.
2 changes: 1 addition & 1 deletion helio
9 changes: 5 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
bool MatchHttp11Line(string_view line) {
return absl::StartsWith(line, "GET ") && absl::EndsWith(line, "HTTP/1.1");
return (absl::StartsWith(line, "GET ") || absl::StartsWith(line, "POST ")) &&
absl::EndsWith(line, "HTTP/1.1");
}

void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats,
Expand Down Expand Up @@ -651,11 +652,13 @@ void Connection::HandleRequests() {
http_res = CheckForHttpProto(peer);

if (http_res) {
cc_.reset(service_->CreateContext(peer, this));
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
is_http_ = true;
HttpConnection http_conn{http_listener_};
http_conn.SetSocket(peer);
http_conn.set_user_data(cc_.get());
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
Expand All @@ -666,17 +669,15 @@ void Connection::HandleRequests() {
// this connection.
http_conn.ReleaseSocket();
} else {
cc_.reset(service_->CreateContext(peer, this));
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

ConnectionFlow(peer);

socket_->CancelOnErrorCb(); // noop if nothing is registered.

cc_.reset();
}
cc_.reset();
}

VLOG(1) << "Closed connection for peer " << remote_ep;
Expand Down
10 changes: 5 additions & 5 deletions src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {

void StartCollection(unsigned len, CollectionType type) override;

private:
public:
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
using Null = std::nullptr_t; // SendNull or SendNullArray
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

struct StrArrPayload {
bool simple;
Expand All @@ -66,7 +64,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
bool with_scores;
};

public:
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
}
Expand All @@ -89,7 +89,6 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
// If an error is stored inside payload, get a reference to it.
static std::optional<ErrorRef> GetError(const Payload& pl);

private:
struct CollectionPayload {
CollectionPayload(unsigned len, CollectionType type);

Expand All @@ -98,6 +97,7 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
std::vector<Payload> arr;
};

private:
private:
// Send payload directly, bypassing external interface. For efficient passing between two
// captures.
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessor

add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
${SEARCH_FILES}
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
Expand Down
224 changes: 224 additions & 0 deletions src/server/http_api.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/http_api.h"

#include "base/logging.h"
#include "core/flatbuffers.h"
#include "facade/conn_context.h"
#include "facade/reply_builder.h"
#include "server/main_service.h"
#include "util/http/http_common.h"

namespace dfly {
using namespace util;
using namespace std;
namespace h2 = boost::beast::http;
using facade::CapturingReplyBuilder;

namespace {

bool IsValidReq(flexbuffers::Reference req) {
if (!req.IsVector()) {
return false;
}

auto vec = req.AsVector();
if (vec.size() == 0) {
return false;
}

for (size_t i = 0; i < vec.size(); ++i) {
if (!vec[i].IsString()) {
return false;
}
}
return true;
}

// Escape a string so that it is legal to print it in JSON text.
std::string JsonEscape(string_view input) {
auto hex_digit = [](int c) -> char { return c < 10 ? c + '0' : c - 10 + 'a'; };

string out;
out.reserve(input.size() + 2);
out.push_back('\"');

auto* p = reinterpret_cast<const unsigned char*>(input.begin());
auto* e = reinterpret_cast<const unsigned char*>(input.end());

while (p < e) {
if (*p == '\\' || *p == '\"') {
out.push_back('\\');
out.push_back(*p++);
} else if (*p <= 0x1f) {
switch (*p) {
case '\b':
out.append("\\b");
p++;
break;
case '\f':
out.append("\\f");
p++;
break;
case '\n':
out.append("\\n");
p++;
break;
case '\r':
out.append("\\r");
p++;
break;
case '\t':
out.append("\\t");
p++;
break;
default:
// this condition captures non readable chars with value < 32,
// so size = 1 byte (e.g control chars).
out.append("\\u00");
out.push_back(hex_digit((*p & 0xf0) >> 4));
out.push_back(hex_digit(*p & 0xf));
p++;
}
} else {
out.push_back(*p++);
}
}

out.push_back('\"');
return out;
}

struct CaptureVisitor {
CaptureVisitor() {
str = R"({"result":)";
}

void operator()(monostate) {
}

void operator()(long v) {
absl::StrAppend(&str, v);
}

void operator()(double v) {
absl::StrAppend(&str, v);
}

void operator()(const CapturingReplyBuilder::SimpleString& ss) {
absl::StrAppend(&str, "\"", ss, "\"");
}

void operator()(const CapturingReplyBuilder::BulkString& bs) {
absl::StrAppend(&str, JsonEscape(bs));
}

void operator()(CapturingReplyBuilder::Null) {
absl::StrAppend(&str, "null");
}

void operator()(CapturingReplyBuilder::Error err) {
str = absl::StrCat(R"({"error": ")", err.first);
}

void operator()(facade::OpStatus status) {
absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\"");
}

void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const unique_ptr<CapturingReplyBuilder::CollectionPayload>& cp) {
if (!cp) {
absl::StrAppend(&str, "null");
return;
}
if (cp->len == 0 && cp->type == facade::RedisReplyBuilder::ARRAY) {
absl::StrAppend(&str, "[]");
return;
}

absl::StrAppend(&str, "[");
for (auto& pl : cp->arr) {
visit(*this, std::move(pl));
}
}

void operator()(facade::SinkReplyBuilder::MGetResponse resp) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
absl::StrAppend(&str, "[");
for (const auto& [key, score] : sarr.arr) {
absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},");
}
if (sarr.arr.size() > 0) {
str.pop_back();
}
absl::StrAppend(&str, "]");
}

string str;
};

} // namespace

void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
HttpContext* http_cntx) {
auto& body = req.body();

flexbuffers::Builder fbb;
flatbuffers::Parser parser;
flexbuffers::Reference doc;
bool success = parser.ParseFlexBuffer(body.c_str(), nullptr, &fbb);
if (success) {
fbb.Finish();
doc = flexbuffers::GetRoot(fbb.GetBuffer());
if (!IsValidReq(doc)) {
success = false;
}
}

if (!success) {
auto response = http::MakeStringResponse(h2::status::bad_request);
http::SetMime(http::kTextMime, &response);
response.body() = "Failed to parse json\r\n";
http_cntx->Invoke(std::move(response));
return;
}

vector<string> cmd_args;
flexbuffers::Vector vec = doc.AsVector();
for (size_t i = 0; i < vec.size(); ++i) {
cmd_args.push_back(vec[i].AsString().c_str());
}
vector<facade::MutableSlice> cmd_slices(cmd_args.size());
for (size_t i = 0; i < cmd_args.size(); ++i) {
cmd_slices[i] = absl::MakeSpan(cmd_args[i]);
}

facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data();
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);
// TODO: to finish this.
service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();

context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kJsonMime, &response);

CaptureVisitor visitor;
std::visit(visitor, std::move(payload));
visitor.str.append("}\r\n");
response.body() = visitor.str;
http_cntx->Invoke(std::move(response));
}

} // namespace dfly
16 changes: 16 additions & 0 deletions src/server/http_api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include "util/http/http_handler.h"

namespace dfly {
class Service;
using HttpRequest = util::HttpListenerBase::RequestType;

void HttpAPI(const util::http::QueryArgs& args, HttpRequest&& req, Service* service,
util::HttpContext* send);

} // namespace dfly
13 changes: 12 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand Down Expand Up @@ -40,6 +40,7 @@ extern "C" {
#include "server/generic_family.h"
#include "server/hll_family.h"
#include "server/hset_family.h"
#include "server/http_api.h"
#include "server/json_family.h"
#include "server/list_family.h"
#include "server/multi_command_squasher.h"
Expand Down Expand Up @@ -83,6 +84,9 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without "
"authorization needed.");

ABSL_FLAG(bool, expose_http_api, false,
"If set, will expose a POST /api handler for sending redis commands as json array.");

ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
Expand Down Expand Up @@ -2441,6 +2445,13 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil
base->RegisterCb("/clusterz", [this](const http::QueryArgs& args, HttpContext* send) {
return ClusterHtmlPage(args, send, &cluster_family_);
});

if (absl::GetFlag(FLAGS_expose_http_api)) {
base->RegisterCb("/api",
[this](const http::QueryArgs& args, HttpRequest&& req, HttpContext* send) {
HttpAPI(args, std::move(req), this, send);
});
}
}

void Service::OnClose(facade::ConnectionContext* cntx) {
Expand Down
Loading

0 comments on commit b7d996b

Please sign in to comment.