diff --git a/include/tateyama/api/server/blob_info.h b/include/tateyama/api/server/blob_info.h new file mode 100644 index 00000000..da78bafe --- /dev/null +++ b/include/tateyama/api/server/blob_info.h @@ -0,0 +1,74 @@ +/* + * Copyright 2018-2025 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +namespace tateyama::api::server { + +/** + * @brief an abstract super class of BLOB data from the client. + */ +class blob_info { +public: + /** + * @brief create empty object + */ + blob_info() = default; + + /** + * @brief destruct the object + */ + virtual ~blob_info() = default; + + blob_info(blob_info const& other) = default; + blob_info& operator=(blob_info const& other) = default; + blob_info(blob_info&& other) noexcept = default; + blob_info& operator=(blob_info&& other) noexcept = default; + + /** + * @brief returns the channel name of the BLOB data. + * @return the channel name + */ + [[nodiscard]] virtual std::string_view channel_name() const noexcept = 0; + + /** + * @brief returns the path of the file that represents the BLOB. + * @details If the BLOB data is temporary, the file may has been lost. + * @return the path of the file + * @see is_temporary() + */ + [[nodiscard]] virtual std::filesystem::path path() const noexcept = 0; + + /** + * @brief returns whether the file is temporary, and created in the database process. + * @details If the file is temporary, the service can remove of modify the file while processing the request. + * @return true if the BLOB data is temporary + * @return false otherwise + */ + [[nodiscard]] virtual bool is_temporary() const noexcept = 0; + + /** + * @brief disposes temporary resources underlying in this BLOB data. + * @note If the temporary resources are already disposed, this operation does nothing. + */ + virtual void dispose() = 0; +}; + +} diff --git a/include/tateyama/api/server/request.h b/include/tateyama/api/server/request.h index f459b6d5..0b9325e0 100644 --- a/include/tateyama/api/server/request.h +++ b/include/tateyama/api/server/request.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ #include "database_info.h" #include "session_info.h" #include "session_store.h" +#include "blob_info.h" namespace tateyama::api::server { @@ -89,6 +90,22 @@ class request { * @returns the current session variable set */ [[nodiscard]] virtual tateyama::session::session_variable_set& session_variable_set() noexcept = 0; + + /** + * @brief returns whether the request has a BLOB data with the specified channel name. + * @param channel_name the channel name + * @return true if the request has the BLOB data + * @return false otherwise + */ + [[nodiscard]] virtual bool has_blob(std::string_view channel_name) const noexcept = 0; + + /** + * @brief returns the BLOB data with the specified channel name. + * @param name the channel name + * @return the BLOB data + * @throws if there is no such the BLOB data + */ + [[nodiscard]] virtual blob_info const& get_blob(std::string_view name) const = 0; }; } diff --git a/include/tateyama/api/server/response.h b/include/tateyama/api/server/response.h index a59fe10c..f38b97e6 100644 --- a/include/tateyama/api/server/response.h +++ b/include/tateyama/api/server/response.h @@ -16,6 +16,7 @@ #pragma once #include +#include #include "data_channel.h" @@ -122,6 +123,16 @@ class response { */ [[nodiscard]] virtual bool check_cancel() const = 0; + /** + * @brief Pass the BLOB data as the response. + * @pre body_head() and body() function of this object is not yet called + * @param blob the BLOB data to pass + * @return status::ok if the BLOB data was successfully registered + * @return status::not_found if contents is not found in this BLOB data + * @return status::already_exists if another BLOB data with the same channel name is already added + */ + [[nodiscard]] virtual status add_blob(std::unique_ptr blob) = 0; + }; } diff --git a/src/tateyama/endpoint/common/endpoint_proto_utils.h b/src/tateyama/endpoint/common/endpoint_proto_utils.h index 4cdb6efe..c9981d9a 100644 --- a/src/tateyama/endpoint/common/endpoint_proto_utils.h +++ b/src/tateyama/endpoint/common/endpoint_proto_utils.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,17 @@ #pragma once #include +#include +#include +#include #include #include "tateyama/framework/component_ids.h" #include #include +#include +#include #include namespace tateyama::endpoint::common { @@ -32,7 +37,7 @@ struct parse_result { std::size_t session_id_{}; }; -inline bool parse_header(std::string_view input, parse_result& result) { +inline bool parse_header(std::string_view input, parse_result& result, [[maybe_unused]] std::map>& blobs_map) { result = {}; ::tateyama::proto::framework::request::Header hdr{}; google::protobuf::io::ArrayInputStream in{input.data(), static_cast(input.size())}; @@ -41,17 +46,35 @@ inline bool parse_header(std::string_view input, parse_result& result) { } result.session_id_ = hdr.session_id(); result.service_id_ = hdr.service_id(); + if(hdr.has_blobs()) { + for(auto&& e : hdr. blobs().blobs()) { + blobs_map.emplace(e.channel_name(), std::make_pair(e.path(), e.temporary())); + } + } return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, result.payload_); } struct header_content { std::size_t session_id_{}; + std::set, pointer_comp>* blobs_{}; }; inline bool append_response_header(std::stringstream& ss, std::string_view body, header_content input, ::tateyama::proto::framework::response::Header::PayloadType type = ::tateyama::proto::framework::response::Header::UNKNOWN) { ::tateyama::proto::framework::response::Header hdr{}; hdr.set_session_id(input.session_id_); hdr.set_payload_type(type); + if(input.blobs_ && type == ::tateyama::proto::framework::response::Header::SERVICE_RESULT) { + if (!(input.blobs_)->empty()) { + auto* blobs = hdr.mutable_blobs(); + for(auto&& e: *input.blobs_) { + auto* blob = blobs->add_blobs(); + auto cn = e->channel_name(); + blob->set_channel_name(cn.data(), cn.length()); + blob->set_path((e->path()).string()); + blob->set_temporary(e->is_temporary()); + } + } + } if(auto res = utils::SerializeDelimitedToOstream(hdr, std::addressof(ss)); ! res) { return false; } diff --git a/src/tateyama/endpoint/common/request.h b/src/tateyama/endpoint/common/request.h index 3b9d79a1..9bcfeab1 100644 --- a/src/tateyama/endpoint/common/request.h +++ b/src/tateyama/endpoint/common/request.h @@ -16,17 +16,43 @@ #pragma once #include +#include +#include #include #include "tateyama/status/resource/database_info_impl.h" #include "session_info_impl.h" +#include "tateyama/endpoint/common/endpoint_proto_utils.h" namespace tateyama::endpoint::common { /** * @brief request object for common_endpoint */ class request : public tateyama::api::server::request { + class blob_info_impl : public tateyama::api::server::blob_info { + public: + blob_info_impl(std::string_view channel_name, std::filesystem::path path, bool temporary) + : channel_name_(channel_name), path_(std::move(path)), temporary_(temporary) { + } + + [[nodiscard]] std::string_view channel_name() const noexcept override { + return channel_name_; + } + [[nodiscard]] std::filesystem::path path() const noexcept override { + return path_; + } + [[nodiscard]] bool is_temporary() const noexcept override { + return temporary_; + } + void dispose() override { + } + private: + const std::string channel_name_{}; + const std::filesystem::path path_{}; + const bool temporary_{}; + }; + public: explicit request(const tateyama::api::server::database_info& database_info, const tateyama::api::server::session_info& session_info, @@ -37,6 +63,14 @@ class request : public tateyama::api::server::request { local_id_(local_id), start_at_(std::chrono::system_clock::now()) { } + [[nodiscard]] std::size_t session_id() const override { + return session_id_; + } + + [[nodiscard]] std::size_t service_id() const override { + return service_id_; + } + [[nodiscard]] tateyama::api::server::database_info const& database_info() const noexcept override { return database_info_; } @@ -57,6 +91,19 @@ class request : public tateyama::api::server::request { return local_id_; } + [[nodiscard]] bool has_blob(std::string_view channel_name) const noexcept override { + return blobs_.find(std::string(channel_name)) != blobs_.end(); + } + + [[nodiscard]] tateyama::api::server::blob_info const& get_blob(std::string_view name) const override { + if(auto itr = blobs_.find(std::string(name)); itr != blobs_.end()) { + blob_info_ = std::make_unique(name, std::filesystem::path(itr->second.first), itr->second.second); + return *blob_info_; + } + throw std::runtime_error("blob not found"); + } + + [[nodiscard]] std::chrono::system_clock::time_point start_at() const noexcept { return start_at_; } @@ -70,9 +117,22 @@ class request : public tateyama::api::server::request { tateyama::session::session_variable_set& session_variable_set_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes) + void parse_framework_header(std::string_view message, parse_result& res) { + if (parse_header(message, res, blobs_)) { + session_id_ = res.session_id_; + service_id_ = res.service_id_; + return; + } + throw std::runtime_error("error in parse framework header"); + } + private: - std::size_t local_id_; + std::size_t session_id_{}; + std::size_t service_id_{}; + std::map> blobs_{}; + std::size_t local_id_{}; + mutable std::unique_ptr blob_info_{}; std::chrono::system_clock::time_point start_at_; }; diff --git a/src/tateyama/endpoint/common/response.h b/src/tateyama/endpoint/common/response.h index 2170df56..9b8eeddc 100644 --- a/src/tateyama/endpoint/common/response.h +++ b/src/tateyama/endpoint/common/response.h @@ -18,12 +18,14 @@ #include #include #include +#include #include #include #include #include +#include "pointer_comp.h" namespace tateyama::endpoint::common { /** @@ -34,12 +36,21 @@ class response : public tateyama::api::server::response { explicit response(std::size_t index) : index_(index) { } + void session_id(std::size_t id) override { + session_id_ = id; + } + [[nodiscard]] bool check_cancel() const override { return cancel_; } - void session_id(std::size_t id) override { - session_id_ = id; + tateyama::status add_blob(std::unique_ptr blob) override { + try { + blobs_.emplace(std::move(blob)); + return tateyama::status::ok; + } catch (std::exception &ex) { + return tateyama::status::unknown; + } } void cancel() noexcept { @@ -110,6 +121,8 @@ class response : public tateyama::api::server::response { std::atomic_bool completed_{}; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes) + std::set, pointer_comp> blobs_{}; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes) + void set_state(state s) { state_ = s; } diff --git a/src/tateyama/endpoint/ipc/ipc_request.cpp b/src/tateyama/endpoint/ipc/ipc_request.cpp index e844b033..42d8d9b2 100644 --- a/src/tateyama/endpoint/ipc/ipc_request.cpp +++ b/src/tateyama/endpoint/ipc/ipc_request.cpp @@ -32,12 +32,4 @@ ipc_request::dispose() { VLOG_LP(log_trace) << static_cast(&server_wire_) << " slot = " << index_; //NOLINT } -std::size_t ipc_request::session_id() const { - return session_id_; -} - -std::size_t ipc_request::service_id() const { - return service_id_; -} - } diff --git a/src/tateyama/endpoint/ipc/ipc_request.h b/src/tateyama/endpoint/ipc/ipc_request.h index 7a278160..033fe1df 100644 --- a/src/tateyama/endpoint/ipc/ipc_request.h +++ b/src/tateyama/endpoint/ipc/ipc_request.h @@ -22,7 +22,6 @@ #include #include "server_wires.h" -#include "tateyama/endpoint/common/endpoint_proto_utils.h" #include "tateyama/logging_helper.h" namespace tateyama::endpoint::ipc { @@ -54,31 +53,22 @@ class alignas(64) ipc_request : public tateyama::endpoint::common::request { message = std::string_view(long_payload_.data(), length_); } endpoint::common::parse_result res{}; - if (endpoint::common::parse_header(message, res)) { - payload_ = res.payload_; - session_id_ = res.session_id_; - service_id_ = res.service_id_; - request_wire->dispose(); - return; - } - throw std::runtime_error("error in parse framework header"); + parse_framework_header(message, res); + payload_ = res.payload_; + request_wire->dispose(); } ipc_request() = delete; [[nodiscard]] std::string_view payload() const override; void dispose(); - [[nodiscard]] std::size_t session_id() const override; - [[nodiscard]] std::size_t service_id() const override; private: server_wire_container& server_wire_; const std::size_t length_; const std::size_t index_; - std::size_t session_id_{}; - std::size_t service_id_{}; - std::string_view payload_{}; + std::string payload_{}; std::array spo_{}; std::string long_payload_{}; }; diff --git a/src/tateyama/endpoint/ipc/ipc_response.cpp b/src/tateyama/endpoint/ipc/ipc_response.cpp index 9c6c6b5d..76f7a38d 100644 --- a/src/tateyama/endpoint/ipc/ipc_response.cpp +++ b/src/tateyama/endpoint/ipc/ipc_response.cpp @@ -39,6 +39,7 @@ tateyama::status ipc_response::body(std::string_view body) { std::stringstream ss{}; endpoint::common::header_content arg{}; arg.session_id_ = session_id_; + arg.blobs_ = &blobs_; if(auto res = endpoint::common::append_response_header(ss, body, arg, ::tateyama::proto::framework::response::Header::SERVICE_RESULT); ! res) { LOG_LP(ERROR) << "error formatting response message"; return status::unknown; diff --git a/src/tateyama/endpoint/loopback/loopback_request.h b/src/tateyama/endpoint/loopback/loopback_request.h index f098d7d5..440766b8 100644 --- a/src/tateyama/endpoint/loopback/loopback_request.h +++ b/src/tateyama/endpoint/loopback/loopback_request.h @@ -86,6 +86,14 @@ class loopback_request: public tateyama::api::server::request { return session_variable_set_; } + [[nodiscard]] bool has_blob(std::string_view) const noexcept override { + return false; + } + + [[nodiscard]] tateyama::api::server::blob_info const& get_blob(std::string_view) const override { + throw std::runtime_error("blob is not supported with loopback endpoint"); + } + private: const std::size_t session_id_; const std::size_t service_id_; diff --git a/src/tateyama/endpoint/loopback/loopback_response.h b/src/tateyama/endpoint/loopback/loopback_response.h index 7591eb20..fb21133b 100644 --- a/src/tateyama/endpoint/loopback/loopback_response.h +++ b/src/tateyama/endpoint/loopback/loopback_response.h @@ -91,6 +91,13 @@ class loopback_response: public tateyama::api::server::response { return false; } + /** + * @see tateyama::server::response::add_blob() + */ + tateyama::status add_blob([[maybe_unused]] std::unique_ptr blob) override { + return tateyama::status::ok; + } + // just for unit test [[nodiscard]] std::map, std::less<>> const& all_committed_data() const noexcept { return committed_data_map_; diff --git a/src/tateyama/endpoint/stream/stream_request.cpp b/src/tateyama/endpoint/stream/stream_request.cpp index 3828d4f0..a0ce3c22 100644 --- a/src/tateyama/endpoint/stream/stream_request.cpp +++ b/src/tateyama/endpoint/stream/stream_request.cpp @@ -27,12 +27,4 @@ stream_request::payload() const { return payload_; } -std::size_t stream_request::session_id() const { - return session_id_; -} - -std::size_t stream_request::service_id() const { - return service_id_; -} - } diff --git a/src/tateyama/endpoint/stream/stream_request.h b/src/tateyama/endpoint/stream/stream_request.h index e1f6bc97..ab468235 100644 --- a/src/tateyama/endpoint/stream/stream_request.h +++ b/src/tateyama/endpoint/stream/stream_request.h @@ -20,7 +20,6 @@ #include #include "stream.h" -#include "tateyama/endpoint/common/endpoint_proto_utils.h" namespace tateyama::endpoint::stream { @@ -39,22 +38,16 @@ class alignas(64) stream_request : public tateyama::endpoint::common::request { std::size_t local_id) : tateyama::endpoint::common::request(database_info, session_info, session_store, session_variable_set, local_id), session_socket_(session_socket) { endpoint::common::parse_result res{}; - endpoint::common::parse_header(payload, res); // TODO handle error + parse_framework_header(payload, res); payload_ = res.payload_; - session_id_ = res.session_id_; - service_id_ = res.service_id_; } [[nodiscard]] std::string_view payload() const override; - [[nodiscard]] std::size_t session_id() const override; - [[nodiscard]] std::size_t service_id() const override; private: stream_socket& session_socket_; std::string payload_{}; - std::size_t session_id_{}; - std::size_t service_id_{}; }; } diff --git a/src/tateyama/endpoint/stream/stream_response.cpp b/src/tateyama/endpoint/stream/stream_response.cpp index 643b12f9..19cafe1d 100644 --- a/src/tateyama/endpoint/stream/stream_response.cpp +++ b/src/tateyama/endpoint/stream/stream_response.cpp @@ -37,6 +37,7 @@ tateyama::status stream_response::body(std::string_view body) { std::stringstream ss{}; endpoint::common::header_content arg{}; arg.session_id_ = session_id_; + arg.blobs_ = &blobs_; if(auto res = endpoint::common::append_response_header(ss, body, arg, ::tateyama::proto::framework::response::Header::SERVICE_RESULT); ! res) { LOG_LP(ERROR) << "error formatting response message"; return status::unknown; diff --git a/src/tateyama/proto/framework/common.proto b/src/tateyama/proto/framework/common.proto new file mode 100644 index 00000000..3564f6a2 --- /dev/null +++ b/src/tateyama/proto/framework/common.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package tateyama.proto.framework.common; + +option java_multiple_files = false; +option java_package = "com.tsurugidb.framework.proto"; +option java_outer_classname = "FrameworkCommon"; + +// Blob info. +message BlobInfo { + + // the channel name. + string channel_name = 1; + + // the file path. + string path = 2; + + // whether file is temporary. + bool temporary = 3; +} + +message RepeatedBlobInfo { + repeated BlobInfo blobs = 1; +} diff --git a/src/tateyama/proto/framework/request.proto b/src/tateyama/proto/framework/request.proto index fff8bf43..46bce6c4 100644 --- a/src/tateyama/proto/framework/request.proto +++ b/src/tateyama/proto/framework/request.proto @@ -5,6 +5,8 @@ package tateyama.proto.framework.request; option java_package = "com.tsurugidb.framework.proto"; option java_outer_classname = "FrameworkRequest"; +import "tateyama/proto/framework/common.proto"; + // common request header for clients to set and send to tateyama. message Header { // service message version (major) @@ -21,4 +23,9 @@ message Header { // session ID. uint64 session_id = 12; -} \ No newline at end of file + + // the blob info + oneof blob_opt { + common.RepeatedBlobInfo blobs = 13; + } +} diff --git a/src/tateyama/proto/framework/response.proto b/src/tateyama/proto/framework/response.proto index 0c8a4eaf..064f276f 100644 --- a/src/tateyama/proto/framework/response.proto +++ b/src/tateyama/proto/framework/response.proto @@ -5,6 +5,8 @@ package tateyama.proto.framework.response; option java_package = "com.tsurugidb.framework.proto"; option java_outer_classname = "FrameworkResponse"; +import "tateyama/proto/framework/common.proto"; + // common response header for client to receive message Header { // reserved for system use @@ -27,4 +29,10 @@ message Header { // the response payload type. PayloadType payload_type = 12; -} \ No newline at end of file + + // the blob info + // valid only if payload_type is SERVICE_RESULT + oneof blob_opt { + common.RepeatedBlobInfo blobs = 13; + } +} diff --git a/test/tateyama/utils/request_response.h b/test/tateyama/utils/request_response.h index 8977f09c..a673aabe 100644 --- a/test/tateyama/utils/request_response.h +++ b/test/tateyama/utils/request_response.h @@ -77,6 +77,14 @@ class test_request : public api::server::request { return session_variable_set_; } + bool has_blob(std::string_view) const noexcept override { + return false; + } + + tateyama::api::server::blob_info const& get_blob(std::string_view) const override { + throw std::runtime_error("blob is not supported with loopback endpoint"); + } + std::size_t session_id_{}; std::size_t service_id_{}; std::size_t local_id_{}; @@ -101,6 +109,8 @@ class test_response : public api::server::response { status acquire_channel(std::string_view name, std::shared_ptr& ch) override { return status::ok; } status release_channel(api::server::data_channel& ch) override { return status::ok; } bool check_cancel() const override { return false; } + status add_blob(std::unique_ptr blob) override { return status::ok; } + std::string& wait_and_get_body() { while (!body_arrived_) { std::this_thread::sleep_for(std::chrono::milliseconds(10));