Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dubbo_proxy: Implement the routing of Dubbo requests #5973

Merged
merged 23 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
61fb32a
Implement the routing of Dubbo requests
gengleilei Feb 15, 2019
95dc2b5
Add missing files
gengleilei Feb 15, 2019
8232308
Delete the testing::Test code
gengleilei Feb 15, 2019
bd5a67d
Fix formatting errors
gengleilei Feb 15, 2019
48b834e
Remove the AppExceptionType definition and fixed a naming conflict fo…
gengleilei Feb 15, 2019
cc34c53
Add some comments to serializeRpcResult
gengleilei Feb 18, 2019
788706b
Modify the onResetStream function implementation
gengleilei Feb 18, 2019
3c16f0f
Add the resetStream interface to DecoderFilterCallbacks
gengleilei Feb 20, 2019
f15e8d8
Remove useless code
gengleilei Feb 20, 2019
f3dfee0
Rename router_matcher to route_matcher
gengleilei Feb 21, 2019
2e071eb
Add the continue decoding logic when the connection fails
gengleilei Feb 21, 2019
a0c81f4
Remove the use of buffered_request_body_
gengleilei Feb 21, 2019
0837aba
Fixed body_size setting error in AppException encoding
gengleilei Feb 22, 2019
2376ed9
Fix body_size calculation error in AppException encoding
gengleilei Feb 22, 2019
379aeba
Fix the RpcResponseType spelling error and optimize the AppException …
gengleilei Feb 22, 2019
a9fc68b
Add some unit test cases
gengleilei Feb 27, 2019
d9eb2c1
Revert "Add some unit test cases"
gengleilei Feb 28, 2019
dac7c18
Revert "Revert "Add some unit test cases""
gengleilei Mar 4, 2019
5352fa0
Add some unit test cases.
gengleilei Mar 5, 2019
c9f1567
Optimize the code
gengleilei Mar 6, 2019
4ecaa7a
Optimize some type definitions
gengleilei Mar 8, 2019
8f66806
Add some unit test cases.
gengleilei Mar 8, 2019
1fe2cd6
Change the use of string_view
gengleilei Mar 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/envoy/config/filter/dubbo/router/v2alpha1/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("//bazel:api_build_system.bzl", "api_proto_library_internal")

licenses(["notice"]) # Apache 2

api_proto_library_internal(
name = "router",
srcs = ["router.proto"],
)
14 changes: 14 additions & 0 deletions api/envoy/config/filter/dubbo/router/v2alpha1/router.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package envoy.config.filter.dubbo.router.v2alpha1;

option java_outer_classname = "RouterProto";
option java_multiple_files = true;
option java_package = "io.envoyproxy.envoy.config.filter.dubbo.router.v2alpha1";
option go_package = "v2alpha1";

// [#protodoc-title: Router]
// Dubbo router :ref:`configuration overview <config_dubbo_filters_router>`.

message Router {
}
17 changes: 16 additions & 1 deletion source/extensions/filters/network/dubbo_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ envoy_cc_library(
deps = [
":buffer_helper_lib",
":message_lib",
":metadata_lib",
"//source/common/common:assert_lib",
"//source/common/config:utility_lib",
"//source/common/singleton:const_singleton",
Expand Down Expand Up @@ -133,7 +134,6 @@ envoy_cc_library(
external_deps = ["abseil_optional"],
deps = [
":message_lib",
":protocol_interface",
"//source/common/http:header_map_lib",
],
)
Expand Down Expand Up @@ -161,3 +161,18 @@ envoy_cc_library(
"//include/envoy/stats:stats_macros",
],
)

envoy_cc_library(
name = "app_exception_lib",
srcs = ["app_exception.cc"],
hdrs = ["app_exception.h"],
deps = [
":deserializer_interface",
":message_lib",
":metadata_lib",
":protocol_interface",
"//include/envoy/buffer:buffer_interface",
"//source/common/buffer:buffer_lib",
"//source/extensions/filters/network/dubbo_proxy/filters:filter_interface",
],
)
44 changes: 44 additions & 0 deletions source/extensions/filters/network/dubbo_proxy/app_exception.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "extensions/filters/network/dubbo_proxy/app_exception.h"

#include "common/buffer/buffer_impl.h"

#include "extensions/filters/network/dubbo_proxy/message.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace DubboProxy {

AppException::AppException(ResponseStatus status, const std::string& what)
: EnvoyException(what), status_(status),
response_type_(RpcResponseType::ResponseWithException) {}

AppException::ResponseType AppException::encode(MessageMetadata& metadata,
DubboProxy::Protocol& protocol,
Deserializer& deserializer,
Buffer::Instance& buffer) const {
ASSERT(buffer.length() == 0);

ENVOY_LOG(debug, "err {}", what());

// Serialize the response content to get the serialized response length.
const std::string& response = what();
size_t serialized_body_size = deserializer.serializeRpcResult(buffer, response, response_type_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check whether the response_type_ meets the requirements, it can carry the body

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

esponse_type_ has been assigned ResponseWithException in the constructor, I'll add an ASSERT.


metadata.setResponseStatus(status_);
metadata.setMessageType(MessageType::Response);

Buffer::OwnedImpl protocol_buffer;
if (!protocol.encode(protocol_buffer, serialized_body_size, metadata)) {
throw EnvoyException("failed to encode local reply message");
}

buffer.prepend(protocol_buffer);

return DirectResponse::ResponseType::Exception;
}

} // namespace DubboProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
32 changes: 32 additions & 0 deletions source/extensions/filters/network/dubbo_proxy/app_exception.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "envoy/common/exception.h"

#include "extensions/filters/network/dubbo_proxy/deserializer.h"
#include "extensions/filters/network/dubbo_proxy/filters/filter.h"
#include "extensions/filters/network/dubbo_proxy/metadata.h"
#include "extensions/filters/network/dubbo_proxy/protocol.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace DubboProxy {

struct AppException : public EnvoyException,
public DubboFilters::DirectResponse,
Logger::Loggable<Logger::Id::dubbo> {
AppException(ResponseStatus status, const std::string& what);
AppException(const AppException& ex) = default;

using ResponseType = DubboFilters::DirectResponse::ResponseType;
ResponseType encode(MessageMetadata& metadata, Protocol& protocol, Deserializer& deserializer,
Buffer::Instance& buffer) const override;

const ResponseStatus status_;
const RpcResponseType response_type_;
};

} // namespace DubboProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
12 changes: 12 additions & 0 deletions source/extensions/filters/network/dubbo_proxy/deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ class Deserializer {
* @throws EnvoyException if the data is not valid for this serialization
*/
virtual RpcResultPtr deserializeRpcResult(Buffer::Instance& buffer, size_t body_size) PURE;

/**
* serialize result of an rpc call
* If successful, the output_buffer is written to the serialized data
*
* @param output_buffer store the serialized data
* @param content the rpc response content
* @param type the rpc response type
* @return size_t the length of the serialized content
*/
virtual size_t serializeRpcResult(Buffer::Instance& output_buffer, const std::string& content,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to return the serialized size? Take the buffer size before serialization minus the serialized buffer size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, but it's much more user-friendly and convenient for the user to explicitly return the serialization length.

RpcResponseType type) PURE;
};

typedef std::unique_ptr<Deserializer> DeserializerPtr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,36 @@ bool isValidResponseStatus(ResponseStatus status) {
return true;
}

void parseRequestInfoFromBuffer(Buffer::Instance& data, MessageMetadataSharedPtr metadata) {
ASSERT(data.length() >= DubboProtocolImpl::MessageSize);
uint8_t flag = data.peekInt<uint8_t>(FlagOffset);
bool is_two_way = (flag & TwoWayMask) == TwoWayMask ? true : false;
SerializationType type = static_cast<SerializationType>(flag & SerializationTypeMask);
if (!isValidSerializationType(type)) {
throw EnvoyException(
fmt::format("invalid dubbo message serialization type {}",
static_cast<std::underlying_type<SerializationType>::type>(type)));
}

if (!is_two_way) {
metadata->setMessageType(MessageType::Oneway);
}

metadata->setSerializationType(type);
}

void parseResponseInfoFromBuffer(Buffer::Instance& buffer, MessageMetadataSharedPtr metadata) {
ASSERT(buffer.length() >= DubboProtocolImpl::MessageSize);
ResponseStatus status = static_cast<ResponseStatus>(buffer.peekInt<uint8_t>(StatusOffset));
if (!isValidResponseStatus(status)) {
throw EnvoyException(
fmt::format("invalid dubbo message response status {}",
static_cast<std::underlying_type<ResponseStatus>::type>(status)));
}

metadata->setResponseStatus(status);
}

void RequestMessageImpl::fromBuffer(Buffer::Instance& data) {
ASSERT(data.length() >= DubboProtocolImpl::MessageSize);
uint8_t flag = data.peekInt<uint8_t>(FlagOffset);
Expand All @@ -76,6 +106,8 @@ void ResponseMessageImpl::fromBuffer(Buffer::Instance& buffer) {
}

bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* context) {
ASSERT(callbacks_);

if (buffer.length() < DubboProtocolImpl::MessageSize) {
return false;
}
Expand Down Expand Up @@ -103,18 +135,80 @@ bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* cont
std::make_unique<RequestMessageImpl>(request_id, body_size, is_event);
req->fromBuffer(buffer);
context->is_request_ = true;
callbacks_.onRequestMessage(std::move(req));
callbacks_->onRequestMessage(std::move(req));
} else {
ResponseMessageImplPtr res =
std::make_unique<ResponseMessageImpl>(request_id, body_size, is_event);
res->fromBuffer(buffer);
callbacks_.onResponseMessage(std::move(res));
callbacks_->onResponseMessage(std::move(res));
}

buffer.drain(MessageSize);
return true;
}

bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* context,
MessageMetadataSharedPtr metadata) {
if (!metadata) {
throw EnvoyException("invalid metadata parameter");
}

if (buffer.length() < DubboProtocolImpl::MessageSize) {
return false;
}

uint16_t magic_number = buffer.peekBEInt<uint16_t>();
if (magic_number != MagicNumber) {
throw EnvoyException(fmt::format("invalid dubbo message magic number {}", magic_number));
}

uint8_t flag = buffer.peekInt<uint8_t>(FlagOffset);
MessageType type =
(flag & MessageTypeMask) == MessageTypeMask ? MessageType::Request : MessageType::Response;
bool is_event = (flag & EventMask) == EventMask ? true : false;
int64_t request_id = buffer.peekBEInt<int64_t>(RequestIDOffset);
int32_t body_size = buffer.peekBEInt<int32_t>(BodySizeOffset);

if (body_size > MaxBodySize || body_size <= 0) {
throw EnvoyException(fmt::format("invalid dubbo message size {}", body_size));
}

metadata->setMessageType(type);
metadata->setRequestId(request_id);

if (type == MessageType::Request) {
parseRequestInfoFromBuffer(buffer, metadata);
} else {
parseResponseInfoFromBuffer(buffer, metadata);
}

context->header_size_ = DubboProtocolImpl::MessageSize;
context->body_size_ = body_size;
context->is_heartbeat_ = is_event;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the logic for heartbeat message processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat processing logic is in the Decoder and ConnectionManager, which is not included in this submission.


return true;
}

bool DubboProtocolImpl::encode(Buffer::Instance& buffer, int32_t body_size,
const MessageMetadata& metadata) {
switch (metadata.message_type()) {
case MessageType::Response: {
ASSERT(metadata.response_status().has_value());
buffer.writeBEInt<uint16_t>(MagicNumber);
buffer.writeByte(static_cast<uint8_t>(metadata.serialization_type()));
buffer.writeByte(static_cast<uint8_t>(metadata.response_status().value()));
buffer.writeBEInt<uint64_t>(metadata.request_id());
buffer.writeBEInt<uint32_t>(body_size);
return true;
}
case MessageType::Request: {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
}

class DubboProtocolConfigFactory : public ProtocolFactoryBase<DubboProtocolImpl> {
public:
DubboProtocolConfigFactory() : ProtocolFactoryBase(ProtocolType::Dubbo) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,21 @@ typedef std::unique_ptr<ResponseMessageImpl> ResponseMessageImplPtr;

class DubboProtocolImpl : public Protocol {
public:
DubboProtocolImpl(ProtocolCallbacks& callbacks) : callbacks_(callbacks) {}
DubboProtocolImpl() = default;
explicit DubboProtocolImpl(ProtocolCallbacks* callbacks) : callbacks_(callbacks) {}
const std::string& name() const override { return ProtocolNames::get().fromType(type()); }
ProtocolType type() const override { return ProtocolType::Dubbo; }
virtual bool decode(Buffer::Instance& buffer, Protocol::Context* context) override;
bool decode(Buffer::Instance& buffer, Protocol::Context* context) override;
bool decode(Buffer::Instance& buffer, Protocol::Context* context,
MessageMetadataSharedPtr metadata) override;
bool encode(Buffer::Instance& buffer, int32_t body_size,
const MessageMetadata& metadata) override;

static constexpr uint8_t MessageSize = 16;
static constexpr int32_t MaxBodySize = 16 * 1024 * 1024;

private:
ProtocolCallbacks& callbacks_;
ProtocolCallbacks* callbacks_;
};

} // namespace DubboProxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ class DecoderFilterCallbacks {
* @return StreamInfo for logging purposes.
*/
virtual StreamInfo::StreamInfo& streamInfo() PURE;

/**
* Reset the underlying stream.
*/
virtual void resetStream() PURE;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@ namespace Extensions {
namespace NetworkFilters {
namespace DubboProxy {

enum class RpcResponseType : uint8_t {
ResponseWithException = 0,
ResponseWithValue = 1,
ResponseWithNullValue = 2,
ResponseWithExceptionWithAttachments = 3,
ResponseValueWithAttachments = 4,
ResponseNullValueWithAttachments = 5,
};

RpcInvocationPtr HessianDeserializerImpl::deserializeRpcInvocation(Buffer::Instance& buffer,
size_t body_size) {
ASSERT(buffer.length() >= body_size);
Expand Down Expand Up @@ -57,9 +48,9 @@ RpcResultPtr HessianDeserializerImpl::deserializeRpcResult(Buffer::Instance& buf
switch (type) {
case RpcResponseType::ResponseWithException:
case RpcResponseType::ResponseWithExceptionWithAttachments:
case RpcResponseType::ResponseWithValue:
result = std::make_unique<RpcResultImpl>(true);
break;
case RpcResponseType::ResponseWithValue:
case RpcResponseType::ResponseWithNullValue:
has_value = false;
FALLTHRU;
Expand All @@ -85,6 +76,23 @@ RpcResultPtr HessianDeserializerImpl::deserializeRpcResult(Buffer::Instance& buf
return result;
}

size_t HessianDeserializerImpl::serializeRpcResult(Buffer::Instance& output_buffer,
const std::string& content,
RpcResponseType type) {
size_t origin_length = output_buffer.length();

// The serialized response type is compact int.
size_t serialized_size = HessianUtils::writeInt(
output_buffer, static_cast<std::underlying_type<RpcResponseType>::type>(type));

// Serialized response content.
serialized_size += HessianUtils::writeString(output_buffer, content);

ASSERT((output_buffer.length() - origin_length) == serialized_size);

return serialized_size;
}

class HessianDeserializerConfigFactory : public DeserializerFactoryBase<HessianDeserializerImpl> {
public:
HessianDeserializerConfigFactory() : DeserializerFactoryBase(SerializationType::Hessian) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class HessianDeserializerImpl : public Deserializer {
virtual RpcInvocationPtr deserializeRpcInvocation(Buffer::Instance& buffer,
size_t body_size) override;
virtual RpcResultPtr deserializeRpcResult(Buffer::Instance& buffer, size_t body_size) override;
virtual size_t serializeRpcResult(Buffer::Instance& output_buffer, const std::string& content,
RpcResponseType type) override;
};

} // namespace DubboProxy
Expand Down
Loading