Skip to content

Commit

Permalink
chore: .
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Mar 24, 2024
1 parent 9c6a3fa commit 7ac6d1c
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 15 deletions.
2 changes: 1 addition & 1 deletion helio
6 changes: 3 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,13 @@ void Connection::HandleRequests() {
http_res = CheckForHttpProto(peer);

if (http_res) {
cc_.reset(service_->CreateContext(peer, this));
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
is_http_ = true;
HttpConnection http_conn{http_listener_};
http_conn.SetSocket(peer);
http_conn.set_user_data(cc_.get());
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
Expand All @@ -667,17 +669,15 @@ void Connection::HandleRequests() {
// this connection.
http_conn.ReleaseSocket();
} else {
cc_.reset(service_->CreateContext(peer, this));
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

ConnectionFlow(peer);

socket_->CancelOnErrorCb(); // noop if nothing is registered.

cc_.reset();
}
cc_.reset();
}

VLOG(1) << "Closed connection for peer " << remote_ep;
Expand Down
10 changes: 5 additions & 5 deletions src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {

void StartCollection(unsigned len, CollectionType type) override;

private:
public:
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
using Null = std::nullptr_t; // SendNull or SendNullArray
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

struct StrArrPayload {
bool simple;
Expand All @@ -66,7 +64,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
bool with_scores;
};

public:
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
}
Expand All @@ -89,7 +89,6 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
// If an error is stored inside payload, get a reference to it.
static std::optional<ErrorRef> GetError(const Payload& pl);

private:
struct CollectionPayload {
CollectionPayload(unsigned len, CollectionType type);

Expand All @@ -98,6 +97,7 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
std::vector<Payload> arr;
};

private:
private:
// Send payload directly, bypassing external interface. For efficient passing between two
// captures.
Expand Down
155 changes: 149 additions & 6 deletions src/server/http_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "server/http_api.h"

#include "base/logging.h"
#include "core/flatbuffers.h"
#include "facade/conn_context.h"
#include "facade/reply_builder.h"
Expand All @@ -14,6 +15,7 @@ namespace dfly {
using namespace util;
using namespace std;
namespace h2 = boost::beast::http;
using facade::CapturingReplyBuilder;

namespace {

Expand All @@ -35,9 +37,138 @@ bool IsValidReq(flexbuffers::Reference req) {
return true;
}

// Escape a string so that it is legal to print it in JSON text.
std::string JsonEscape(string_view input) {
auto hex_digit = [](int c) -> char { return c < 10 ? c + '0' : c - 10 + 'a'; };

string out;
out.reserve(input.size() + 2);
out.push_back('\"');

auto* p = reinterpret_cast<const unsigned char*>(input.begin());
auto* q = reinterpret_cast<const unsigned char*>(input.begin());
auto* e = reinterpret_cast<const unsigned char*>(input.end());

while (p < e) {
if (*p == '\\' || *p == '\"') {
out.push_back('\\');
out.push_back(*p++);
} else if (*p <= 0x1f) {
switch (*p) {
case '\b':
out.append("\\b");
p++;
break;
case '\f':
out.append("\\f");
p++;
break;
case '\n':
out.append("\\n");
p++;
break;
case '\r':
out.append("\\r");
p++;
break;
case '\t':
out.append("\\t");
p++;
break;
default:
// note that this if condition captures non readable chars
// with value < 32, so size = 1 byte (e.g control chars).
out.append("\\u00");
out.push_back(hex_digit((*p & 0xf0) >> 4));
out.push_back(hex_digit(*p & 0xf));
p++;
}
} else {
out.push_back(*p++);
}
}

out.push_back('\"');
}

struct CaptureVisitor {
CaptureVisitor() {
str = R"({"result":)";
}

void operator()(monostate) {
}

void operator()(long v) {
absl::StrAppend(&str, v);
}

void operator()(double v) {
absl::StrAppend(&str, v);
}

void operator()(const CapturingReplyBuilder::SimpleString& ss) {
absl::StrAppend(&str, "\"", ss, "\"");
}

void operator()(const CapturingReplyBuilder::BulkString& bs) {
absl::StrAppend(&str, JsonEscape(bs));
}

void operator()(CapturingReplyBuilder::Null) {
absl::StrAppend(&str, "null");
}

void operator()(CapturingReplyBuilder::Error err) {
str = absl::StrCat(R"({"error": ")", err.first);
}

void operator()(facade::OpStatus status) {
absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\"");
}

void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const unique_ptr<CapturingReplyBuilder::CollectionPayload>& cp) {
if (!cp) {
absl::StrAppend(&str, "null");
return;
}
if (cp->len == 0 && cp->type == facade::RedisReplyBuilder::ARRAY) {
absl::StrAppend(&str, "[]");
return;
}

absl::StrAppend(&str, "[");
for (auto& pl : cp->arr) {
visit(*this, std::move(pl));
}
}

void operator()(facade::SinkReplyBuilder::MGetResponse resp) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
absl::StrAppend(&str, "[");
for (const auto& [key, score] : sarr.arr) {
absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},");
}
if (sarr.arr.size() > 0) {
str.pop_back();
}
absl::StrAppend(&str, "]");
}

string str;
};

} // namespace

void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service, HttpContext* send) {
void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
HttpContext* http_cntx) {
auto& body = req.body();

flexbuffers::Builder fbb;
Expand All @@ -56,7 +187,7 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service, H
auto response = http::MakeStringResponse(h2::status::bad_request);
http::SetMime(http::kTextMime, &response);
response.body() = "Failed to parse json\r\n";
send->Invoke(std::move(response));
http_cntx->Invoke(std::move(response));
return;
}

Expand All @@ -70,12 +201,24 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service, H
cmd_slices[i] = absl::MakeSpan(cmd_args[i]);
}

facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data();
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);
// TODO: to finish this.
// service->DispatchCommand(absl::MakeSpan(cmd_slices), ctx.get());
service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();

context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kTextMime, &response);
response.body() = "TBD\r\n";
send->Invoke(std::move(response));
http::SetMime(http::kJsonMime, &response);

CaptureVisitor visitor;
std::visit(visitor, std::move(payload));
visitor.str.append("}\r\n");
response.body() = visitor.str;
http_cntx->Invoke(std::move(response));
}

} // namespace dfly

0 comments on commit 7ac6d1c

Please sign in to comment.