Skip to content

Commit

Permalink
apacheGH-43543: [FlightRPC][C++] Reduce the number of references to p…
Browse files Browse the repository at this point in the history
…rotobuf::Any (apache#43544)

### Rationale for this change

The less code depends on `protobuf::Any`, the easier it is to separate the parts that need to link against pb libraries from the ones that can be built without it.

### What changes are included in this PR?

 - Internal forward declarations (plus making some dtor/ctor non-inline)
 - Add missing `SchemaResult::FromProto`
 - Extract a few proto-related functions
 - Move more generic code from `flight/sql` to `sql`

### Are these changes tested?

By existing tests.
* GitHub Issue: apache#43543

Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
  • Loading branch information
felipecrv authored Aug 8, 2024
1 parent 3420c0d commit ee3273e
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 162 deletions.
5 changes: 2 additions & 3 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ void TestRoundtrip(const std::vector<FlightType>& values,
ASSERT_OK(internal::FromProto(pb_value, &info_data));
EXPECT_EQ(values[i], FlightInfo{std::move(info_data)});
} else if constexpr (std::is_same_v<FlightType, SchemaResult>) {
std::string data;
ASSERT_OK(internal::FromProto(pb_value, &data));
SchemaResult value(std::move(data));
SchemaResult value;
ASSERT_OK(internal::FromProto(pb_value, &value));
EXPECT_EQ(values[i], value);
} else {
FlightType value;
Expand Down
58 changes: 56 additions & 2 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
#include <memory>
#include <string>

#include <google/protobuf/any.pb.h>

#include "arrow/buffer.h"
#include "arrow/flight/protocol_internal.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
Expand All @@ -39,6 +42,57 @@ namespace arrow {
namespace flight {
namespace internal {

namespace {

Status PackToAnyAndSerialize(const google::protobuf::Message& command, std::string* out) {
google::protobuf::Any any;
#if PROTOBUF_VERSION >= 3015000
if (!any.PackFrom(command)) {
return Status::SerializationError("Failed to pack ", command.GetTypeName());
}
#else
any.PackFrom(command);
#endif

#if PROTOBUF_VERSION >= 3015000
if (!any.SerializeToString(out)) {
return Status::SerializationError("Failed to serialize ", command.GetTypeName());
}
#else
any.SerializeToString(out);
#endif
return Status::OK();
}

} // namespace

Status PackProtoCommand(const google::protobuf::Message& command, FlightDescriptor* out) {
std::string buf;
RETURN_NOT_OK(PackToAnyAndSerialize(command, &buf));
*out = FlightDescriptor::Command(std::move(buf));
return Status::OK();
}

Status PackProtoAction(std::string action_type, const google::protobuf::Message& action,
Action* out) {
std::string buf;
RETURN_NOT_OK(PackToAnyAndSerialize(action, &buf));
out->type = std::move(action_type);
out->body = Buffer::FromString(std::move(buf));
return Status::OK();
}

Status UnpackProtoAction(const Action& action, google::protobuf::Message* out) {
google::protobuf::Any any;
if (!any.ParseFromArray(action.body->data(), static_cast<int>(action.body->size()))) {
return Status::Invalid("Unable to parse action ", action.type);
}
if (!any.UnpackTo(out)) {
return Status::Invalid("Unable to unpack ", out->GetTypeName());
}
return Status::OK();
}

// Timestamp

Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp) {
Expand Down Expand Up @@ -282,8 +336,8 @@ Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* basic_auth) {
return Status::OK();
}

Status FromProto(const pb::SchemaResult& pb_result, std::string* result) {
*result = pb_result.schema();
Status FromProto(const pb::SchemaResult& pb_result, SchemaResult* result) {
*result = SchemaResult{pb_result.schema()};
return Status::OK();
}

Expand Down
60 changes: 57 additions & 3 deletions cpp/src/arrow/flight/serialization_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

#pragma once

#include "arrow/flight/protocol_internal.h" // IWYU pragma: keep
#include "arrow/flight/transport.h"
#include "arrow/flight/type_fwd.h"
#include "arrow/flight/types.h"
#include "arrow/util/macros.h"
#include "arrow/flight/visibility.h"

namespace google::protobuf {
class Message;
class Timestamp;
} // namespace google::protobuf

namespace arrow {

Expand All @@ -34,6 +39,32 @@ class Message;
} // namespace ipc

namespace flight {
// Protobuf types from Flight.proto
namespace protocol {
class Action;
class ActionType;
class BasicAuth;
class CancelFlightInfoRequest;
class CancelFlightInfoResult;
class Criteria;
class FlightData;
class FlightDescriptor;
class FlightEndpoint;
class FlightInfo;
class GetSessionOptionsRequest;
class Location;
class PollInfo;
class RenewFlightEndpointRequest;
class Result;
class SchemaResult;
class SetSessionOptionsRequest;
class SetSessionOptionsResult;
class Ticket;
class GetSessionOptionsRequest;
class GetSessionOptionsResult;
class CloseSessionRequest;
class CloseSessionResult;
} // namespace protocol
namespace pb = arrow::flight::protocol;
namespace internal {

Expand All @@ -43,6 +74,29 @@ static constexpr char kAuthHeader[] = "authorization";
ARROW_FLIGHT_EXPORT
Status SchemaToString(const Schema& schema, std::string* out);

/// \brief Wraps a protobuf message representing a Flight command in a FlightDescriptor.
///
/// A `FlightDescriptor` can carry a string representing a command in any
/// format the implementation desires. A common pattern in Flight implementations
/// is to wrap a message in a `protobuf::Any` message, which is then serialized
/// into the string of the `FlightDescriptor.`
ARROW_FLIGHT_EXPORT
Status PackProtoCommand(const google::protobuf::Message& command, FlightDescriptor* out);

/// \brief Wraps a protobuf message representing a Flight action.
///
/// A Flight action can carry a string representing an action in any format the
/// implementation desires. A common pattern in Flight implementations is to
/// wrap a message in a `protobuf::Any` message, which is then serialized into
/// the string of the `Action.`
ARROW_FLIGHT_EXPORT
Status PackProtoAction(std::string action_type, const google::protobuf::Message& action,
Action* out);

/// \brief Unpacks a protobuf message packed by PackProtoAction.
ARROW_FLIGHT_EXPORT
Status UnpackProtoAction(const Action& action, google::protobuf::Message* out);

// These functions depend on protobuf types which are not exported in the Flight DLL.

Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp);
Expand All @@ -66,7 +120,7 @@ Status FromProto(const pb::PollInfo& pb_info, PollInfo* info);
Status FromProto(const pb::PollInfo& pb_info, std::unique_ptr<PollInfo>* info);
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request);
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
Status FromProto(const pb::SchemaResult& pb_result, SchemaResult* result);
Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* info);
Status FromProto(const pb::SetSessionOptionsRequest& pb_request,
SetSessionOptionsRequest* request);
Expand Down
Loading

0 comments on commit ee3273e

Please sign in to comment.