Skip to content

Commit

Permalink
Merge pull request #254 from project-tsurugi/wip/i_1113
Browse files Browse the repository at this point in the history
implement blob
  • Loading branch information
t-horikawa authored Jan 17, 2025
2 parents f01c7ef + dac1c63 commit 55f0990
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 46 deletions.
74 changes: 74 additions & 0 deletions include/tateyama/api/server/blob_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string_view>
#include <filesystem>
#include <cstddef>
#include <tateyama/status.h>

namespace tateyama::api::server {

/**
* @brief an abstract super class of BLOB data from the client.
*/
class blob_info {
public:
/**
* @brief create empty object
*/
blob_info() = default;

/**
* @brief destruct the object
*/
virtual ~blob_info() = default;

blob_info(blob_info const& other) = default;
blob_info& operator=(blob_info const& other) = default;
blob_info(blob_info&& other) noexcept = default;
blob_info& operator=(blob_info&& other) noexcept = default;

/**
* @brief returns the channel name of the BLOB data.
* @return the channel name
*/
[[nodiscard]] virtual std::string_view channel_name() const noexcept = 0;

/**
* @brief returns the path of the file that represents the BLOB.
* @details If the BLOB data is temporary, the file may has been lost.
* @return the path of the file
* @see is_temporary()
*/
[[nodiscard]] virtual std::filesystem::path path() const noexcept = 0;

/**
* @brief returns whether the file is temporary, and created in the database process.
* @details If the file is temporary, the service can remove of modify the file while processing the request.
* @return true if the BLOB data is temporary
* @return false otherwise
*/
[[nodiscard]] virtual bool is_temporary() const noexcept = 0;

/**
* @brief disposes temporary resources underlying in this BLOB data.
* @note If the temporary resources are already disposed, this operation does nothing.
*/
virtual void dispose() = 0;
};

}
19 changes: 18 additions & 1 deletion include/tateyama/api/server/request.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
#include "database_info.h"
#include "session_info.h"
#include "session_store.h"
#include "blob_info.h"

namespace tateyama::api::server {

Expand Down Expand Up @@ -89,6 +90,22 @@ class request {
* @returns the current session variable set
*/
[[nodiscard]] virtual tateyama::session::session_variable_set& session_variable_set() noexcept = 0;

/**
* @brief returns whether the request has a BLOB data with the specified channel name.
* @param channel_name the channel name
* @return true if the request has the BLOB data
* @return false otherwise
*/
[[nodiscard]] virtual bool has_blob(std::string_view channel_name) const noexcept = 0;

/**
* @brief returns the BLOB data with the specified channel name.
* @param name the channel name
* @return the BLOB data
* @throws if there is no such the BLOB data
*/
[[nodiscard]] virtual blob_info const& get_blob(std::string_view name) const = 0;
};

}
11 changes: 11 additions & 0 deletions include/tateyama/api/server/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include <tateyama/proto/diagnostics.pb.h>
#include <tateyama/api/server/blob_info.h>

#include "data_channel.h"

Expand Down Expand Up @@ -122,6 +123,16 @@ class response {
*/
[[nodiscard]] virtual bool check_cancel() const = 0;

/**
* @brief Pass the BLOB data as the response.
* @pre body_head() and body() function of this object is not yet called
* @param blob the BLOB data to pass
* @return status::ok if the BLOB data was successfully registered
* @return status::not_found if contents is not found in this BLOB data
* @return status::already_exists if another BLOB data with the same channel name is already added
*/
[[nodiscard]] virtual status add_blob(std::unique_ptr<blob_info> blob) = 0;

};

}
27 changes: 25 additions & 2 deletions src/tateyama/endpoint/common/endpoint_proto_utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,17 @@
#pragma once

#include <string_view>
#include <set>
#include <map>
#include <utility>

#include <tateyama/api/server/request.h>

#include "tateyama/framework/component_ids.h"
#include <tateyama/proto/framework/request.pb.h>
#include <tateyama/proto/framework/response.pb.h>
#include <tateyama/api/server/blob_info.h>
#include <tateyama/endpoint/common/pointer_comp.h>
#include <tateyama/utils/protobuf_utils.h>

namespace tateyama::endpoint::common {
Expand All @@ -32,7 +37,7 @@ struct parse_result {
std::size_t session_id_{};
};

inline bool parse_header(std::string_view input, parse_result& result) {
inline bool parse_header(std::string_view input, parse_result& result, [[maybe_unused]] std::map<std::string, std::pair<std::filesystem::path, bool>>& blobs_map) {
result = {};
::tateyama::proto::framework::request::Header hdr{};
google::protobuf::io::ArrayInputStream in{input.data(), static_cast<int>(input.size())};
Expand All @@ -41,17 +46,35 @@ inline bool parse_header(std::string_view input, parse_result& result) {
}
result.session_id_ = hdr.session_id();
result.service_id_ = hdr.service_id();
if(hdr.has_blobs()) {
for(auto&& e : hdr. blobs().blobs()) {
blobs_map.emplace(e.channel_name(), std::make_pair(e.path(), e.temporary()));
}
}
return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, result.payload_);
}

struct header_content {
std::size_t session_id_{};
std::set<std::unique_ptr<tateyama::api::server::blob_info>, pointer_comp<tateyama::api::server::blob_info>>* blobs_{};
};

inline bool append_response_header(std::stringstream& ss, std::string_view body, header_content input, ::tateyama::proto::framework::response::Header::PayloadType type = ::tateyama::proto::framework::response::Header::UNKNOWN) {
::tateyama::proto::framework::response::Header hdr{};
hdr.set_session_id(input.session_id_);
hdr.set_payload_type(type);
if(input.blobs_ && type == ::tateyama::proto::framework::response::Header::SERVICE_RESULT) {
if (!(input.blobs_)->empty()) {
auto* blobs = hdr.mutable_blobs();
for(auto&& e: *input.blobs_) {
auto* blob = blobs->add_blobs();
auto cn = e->channel_name();
blob->set_channel_name(cn.data(), cn.length());
blob->set_path((e->path()).string());
blob->set_temporary(e->is_temporary());
}
}
}
if(auto res = utils::SerializeDelimitedToOstream(hdr, std::addressof(ss)); ! res) {
return false;
}
Expand Down
62 changes: 61 additions & 1 deletion src/tateyama/endpoint/common/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,43 @@
#pragma once

#include <chrono>
#include <string_view>
#include <filesystem>

#include <tateyama/api/server/request.h>

#include "tateyama/status/resource/database_info_impl.h"
#include "session_info_impl.h"
#include "tateyama/endpoint/common/endpoint_proto_utils.h"

namespace tateyama::endpoint::common {
/**
* @brief request object for common_endpoint
*/
class request : public tateyama::api::server::request {
class blob_info_impl : public tateyama::api::server::blob_info {
public:
blob_info_impl(std::string_view channel_name, std::filesystem::path path, bool temporary)
: channel_name_(channel_name), path_(std::move(path)), temporary_(temporary) {
}

[[nodiscard]] std::string_view channel_name() const noexcept override {
return channel_name_;
}
[[nodiscard]] std::filesystem::path path() const noexcept override {
return path_;
}
[[nodiscard]] bool is_temporary() const noexcept override {
return temporary_;
}
void dispose() override {
}
private:
const std::string channel_name_{};
const std::filesystem::path path_{};
const bool temporary_{};
};

public:
explicit request(const tateyama::api::server::database_info& database_info,
const tateyama::api::server::session_info& session_info,
Expand All @@ -37,6 +63,14 @@ class request : public tateyama::api::server::request {
local_id_(local_id), start_at_(std::chrono::system_clock::now()) {
}

[[nodiscard]] std::size_t session_id() const override {
return session_id_;
}

[[nodiscard]] std::size_t service_id() const override {
return service_id_;
}

[[nodiscard]] tateyama::api::server::database_info const& database_info() const noexcept override {
return database_info_;
}
Expand All @@ -57,6 +91,19 @@ class request : public tateyama::api::server::request {
return local_id_;
}

[[nodiscard]] bool has_blob(std::string_view channel_name) const noexcept override {
return blobs_.find(std::string(channel_name)) != blobs_.end();
}

[[nodiscard]] tateyama::api::server::blob_info const& get_blob(std::string_view name) const override {
if(auto itr = blobs_.find(std::string(name)); itr != blobs_.end()) {
blob_info_ = std::make_unique<blob_info_impl>(name, std::filesystem::path(itr->second.first), itr->second.second);
return *blob_info_;
}
throw std::runtime_error("blob not found");
}


[[nodiscard]] std::chrono::system_clock::time_point start_at() const noexcept {
return start_at_;
}
Expand All @@ -70,9 +117,22 @@ class request : public tateyama::api::server::request {

tateyama::session::session_variable_set& session_variable_set_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

void parse_framework_header(std::string_view message, parse_result& res) {
if (parse_header(message, res, blobs_)) {
session_id_ = res.session_id_;
service_id_ = res.service_id_;
return;
}
throw std::runtime_error("error in parse framework header");
}

private:
std::size_t local_id_;
std::size_t session_id_{};
std::size_t service_id_{};
std::map<std::string, std::pair<std::filesystem::path, bool>> blobs_{};

std::size_t local_id_{};
mutable std::unique_ptr<blob_info_impl> blob_info_{};
std::chrono::system_clock::time_point start_at_;
};

Expand Down
17 changes: 15 additions & 2 deletions src/tateyama/endpoint/common/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include <atomic>
#include <memory>
#include <string_view>
#include <exception>

#include <glog/logging.h>
#include <tateyama/logging.h>
#include <tateyama/logging_helper.h>

#include <tateyama/api/server/response.h>
#include "pointer_comp.h"

namespace tateyama::endpoint::common {
/**
Expand All @@ -34,12 +36,21 @@ class response : public tateyama::api::server::response {
explicit response(std::size_t index) : index_(index) {
}

void session_id(std::size_t id) override {
session_id_ = id;
}

[[nodiscard]] bool check_cancel() const override {
return cancel_;
}

void session_id(std::size_t id) override {
session_id_ = id;
tateyama::status add_blob(std::unique_ptr<tateyama::api::server::blob_info> blob) override {
try {
blobs_.emplace(std::move(blob));
return tateyama::status::ok;
} catch (std::exception &ex) {
return tateyama::status::unknown;
}
}

void cancel() noexcept {
Expand Down Expand Up @@ -110,6 +121,8 @@ class response : public tateyama::api::server::response {

std::atomic_bool completed_{}; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

std::set<std::unique_ptr<tateyama::api::server::blob_info>, pointer_comp<tateyama::api::server::blob_info>> blobs_{}; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

void set_state(state s) {
state_ = s;
}
Expand Down
8 changes: 0 additions & 8 deletions src/tateyama/endpoint/ipc/ipc_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,4 @@ ipc_request::dispose() {
VLOG_LP(log_trace) << static_cast<const void*>(&server_wire_) << " slot = " << index_; //NOLINT
}

std::size_t ipc_request::session_id() const {
return session_id_;
}

std::size_t ipc_request::service_id() const {
return service_id_;
}

}
Loading

0 comments on commit 55f0990

Please sign in to comment.