Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: preparation for basic http api #2764

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helio
9 changes: 5 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
bool MatchHttp11Line(string_view line) {
return absl::StartsWith(line, "GET ") && absl::EndsWith(line, "HTTP/1.1");
return (absl::StartsWith(line, "GET ") || absl::StartsWith(line, "POST ")) &&
absl::EndsWith(line, "HTTP/1.1");
}

void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats,
Expand Down Expand Up @@ -651,11 +652,13 @@ void Connection::HandleRequests() {
http_res = CheckForHttpProto(peer);

if (http_res) {
cc_.reset(service_->CreateContext(peer, this));
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
is_http_ = true;
HttpConnection http_conn{http_listener_};
http_conn.SetSocket(peer);
http_conn.set_user_data(cc_.get());
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
Expand All @@ -666,17 +669,15 @@ void Connection::HandleRequests() {
// this connection.
http_conn.ReleaseSocket();
} else {
cc_.reset(service_->CreateContext(peer, this));
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

ConnectionFlow(peer);

socket_->CancelOnErrorCb(); // noop if nothing is registered.

cc_.reset();
}
cc_.reset();
}

VLOG(1) << "Closed connection for peer " << remote_ep;
Expand Down
9 changes: 4 additions & 5 deletions src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {

void StartCollection(unsigned len, CollectionType type) override;

private:
public:
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
using Null = std::nullptr_t; // SendNull or SendNullArray
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

struct StrArrPayload {
bool simple;
Expand All @@ -66,7 +64,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
bool with_scores;
};

public:
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
}
Expand All @@ -89,7 +89,6 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
// If an error is stored inside payload, get a reference to it.
static std::optional<ErrorRef> GetError(const Payload& pl);

private:
struct CollectionPayload {
CollectionPayload(unsigned len, CollectionType type);

Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessor

add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
${SEARCH_FILES}
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
Expand Down
228 changes: 228 additions & 0 deletions src/server/http_api.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/http_api.h"

#include "base/logging.h"
#include "core/flatbuffers.h"
#include "facade/conn_context.h"
#include "facade/reply_builder.h"
#include "server/main_service.h"
#include "util/http/http_common.h"

namespace dfly {
using namespace util;
using namespace std;
namespace h2 = boost::beast::http;
using facade::CapturingReplyBuilder;

namespace {

bool IsVectorOfStrings(flexbuffers::Reference req) {
if (!req.IsVector()) {
return false;
}

auto vec = req.AsVector();
romange marked this conversation as resolved.
Show resolved Hide resolved
if (vec.size() == 0) {
return false;
}

for (size_t i = 0; i < vec.size(); ++i) {
if (!vec[i].IsString()) {
return false;
}
}
return true;
}

// Escape a string so that it is legal to print it in JSON text.
std::string JsonEscape(string_view input) {
auto hex_digit = [](unsigned c) -> char {
DCHECK_LT(c, 0xFu);
return c < 10 ? c + '0' : c - 10 + 'a';
};

string out;
out.reserve(input.size() + 2);
out.push_back('\"');

auto p = input.begin();
auto e = input.end();

while (p < e) {
uint8_t c = *p;
if (c == '\\' || c == '\"') {
out.push_back('\\');
out.push_back(*p++);
} else if (c <= 0x1f) {
switch (c) {
case '\b':
out.append("\\b");
p++;
break;
case '\f':
out.append("\\f");
p++;
break;
case '\n':
out.append("\\n");
p++;
break;
case '\r':
out.append("\\r");
p++;
break;
case '\t':
out.append("\\t");
p++;
break;
default:
// this condition captures non readable chars with value < 32,
romange marked this conversation as resolved.
Show resolved Hide resolved
// so size = 1 byte (e.g control chars).
out.append("\\u00");
out.push_back(hex_digit((c & 0xf0) >> 4));
out.push_back(hex_digit(c & 0xf));
p++;
}
} else {
out.push_back(*p++);
}
}

out.push_back('\"');
return out;
}

struct CaptureVisitor {
CaptureVisitor() {
str = R"({"result":)";
}

void operator()(monostate) {
}

void operator()(long v) {
absl::StrAppend(&str, v);
}

void operator()(double v) {
absl::StrAppend(&str, v);
}

void operator()(const CapturingReplyBuilder::SimpleString& ss) {
absl::StrAppend(&str, "\"", ss, "\"");
}

void operator()(const CapturingReplyBuilder::BulkString& bs) {
absl::StrAppend(&str, JsonEscape(bs));
}

void operator()(CapturingReplyBuilder::Null) {
absl::StrAppend(&str, "null");
}

void operator()(CapturingReplyBuilder::Error err) {
str = absl::StrCat(R"({"error": ")", err.first);
}

void operator()(facade::OpStatus status) {
absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\"");
}

void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
absl::StrAppend(&str, "not_implemented");
romange marked this conversation as resolved.
Show resolved Hide resolved
}

void operator()(unique_ptr<CapturingReplyBuilder::CollectionPayload> cp) {
if (!cp) {
absl::StrAppend(&str, "null");
return;
}
if (cp->len == 0 && cp->type == facade::RedisReplyBuilder::ARRAY) {
absl::StrAppend(&str, "[]");
return;
}

absl::StrAppend(&str, "[");
for (auto& pl : cp->arr) {
visit(*this, std::move(pl));
romange marked this conversation as resolved.
Show resolved Hide resolved
}
}

void operator()(facade::SinkReplyBuilder::MGetResponse resp) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
absl::StrAppend(&str, "[");
for (const auto& [key, score] : sarr.arr) {
absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},");
}
if (sarr.arr.size() > 0) {
str.pop_back();
}
absl::StrAppend(&str, "]");
}

string str;
};

} // namespace

void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
HttpContext* http_cntx) {
auto& body = req.body();

flexbuffers::Builder fbb;
flatbuffers::Parser parser;
flexbuffers::Reference doc;
bool success = parser.ParseFlexBuffer(body.c_str(), nullptr, &fbb);
if (success) {
fbb.Finish();
doc = flexbuffers::GetRoot(fbb.GetBuffer());
if (!IsVectorOfStrings(doc)) {
success = false;
}
}

if (!success) {
auto response = http::MakeStringResponse(h2::status::bad_request);
http::SetMime(http::kTextMime, &response);
response.body() = "Failed to parse json\r\n";
http_cntx->Invoke(std::move(response));
return;
}

vector<string> cmd_args;
flexbuffers::Vector vec = doc.AsVector();
for (size_t i = 0; i < vec.size(); ++i) {
cmd_args.push_back(vec[i].AsString().c_str());
}
vector<facade::MutableSlice> cmd_slices(cmd_args.size());
for (size_t i = 0; i < cmd_args.size(); ++i) {
cmd_slices[i] = absl::MakeSpan(cmd_args[i]);
}

facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data();
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);
// TODO: to finish this.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if you added what's missing :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's a stale comment. I will remove it next time I touch this code.

service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();

context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kJsonMime, &response);

CaptureVisitor visitor;
std::visit(visitor, std::move(payload));
visitor.str.append("}\r\n");
response.body() = visitor.str;
http_cntx->Invoke(std::move(response));
}

} // namespace dfly
26 changes: 26 additions & 0 deletions src/server/http_api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include "util/http/http_handler.h"

namespace dfly {
class Service;
using HttpRequest = util::HttpListenerBase::RequestType;

/**
* @brief The main handler function for dispatching commands via HTTP.
*
* @param args - query arguments. currently not used.
* @param req - full http request including the body that should consist of a json array
* representing a Dragonfly command. aka `["set", "foo", "bar"]`
* @param service - a pointer to dfly::Service* object.
* @param http_cntxt - a pointer to the http context object which provide dragonfly context
* information via user_data() and allows to reply with HTTP responses.
*/
void HttpAPI(const util::http::QueryArgs& args, HttpRequest&& req, Service* service,
romange marked this conversation as resolved.
Show resolved Hide resolved
util::HttpContext* http_cntxt);

} // namespace dfly
13 changes: 12 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand Down Expand Up @@ -40,6 +40,7 @@ extern "C" {
#include "server/generic_family.h"
#include "server/hll_family.h"
#include "server/hset_family.h"
#include "server/http_api.h"
#include "server/json_family.h"
#include "server/list_family.h"
#include "server/multi_command_squasher.h"
Expand Down Expand Up @@ -83,6 +84,9 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without "
"authorization needed.");

ABSL_FLAG(bool, expose_http_api, false,
"If set, will expose a POST /api handler for sending redis commands as json array.");

ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
Expand Down Expand Up @@ -2441,6 +2445,13 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil
base->RegisterCb("/clusterz", [this](const http::QueryArgs& args, HttpContext* send) {
return ClusterHtmlPage(args, send, &cluster_family_);
});

if (absl::GetFlag(FLAGS_expose_http_api)) {
base->RegisterCb("/api",
[this](const http::QueryArgs& args, HttpRequest&& req, HttpContext* send) {
HttpAPI(args, std::move(req), this, send);
});
}
}

void Service::OnClose(facade::ConnectionContext* cntx) {
Expand Down
Loading
Loading