diff --git a/include/ocpp/common/message_queue.hpp b/include/ocpp/common/message_queue.hpp index eed83426cc..be1faf65cb 100644 --- a/include/ocpp/common/message_queue.hpp +++ b/include/ocpp/common/message_queue.hpp @@ -71,9 +71,15 @@ template struct ControlMessage { /// \brief Provides the unique message ID stored in the message /// \returns the unique ID of the contained message - MessageId uniqueId() const { + [[nodiscard]] MessageId uniqueId() const { return this->message[MESSAGE_ID]; } + + /// \brief Determine whether message is considered as transaction-related. + bool isTransactionMessage() const; + + /// \brief True for transactional messages containing updates (measurements) for a transaction + bool isTransactionUpdateMessage() const; }; /// \brief contains a message queue that makes sure that OCPPs synchronicity requirements are met @@ -138,8 +144,6 @@ template class MessageQueue { return false; } - bool isTransactionMessage(const std::shared_ptr> message) const; - void add_to_normal_message_queue(std::shared_ptr> message) { EVLOG_debug << "Adding message to normal message queue"; { @@ -181,6 +185,11 @@ template class MessageQueue { !this->normal_message_queue.empty()) { this->drop_messages_from_normal_message_queue(); } + + while (this->transaction_message_queue.size() + this->normal_message_queue.size() > + this->config.queues_total_size_threshold && + this->drop_update_messages_from_transactional_message_queue()) { + } } void drop_messages_from_normal_message_queue() { @@ -195,6 +204,43 @@ template class MessageQueue { } } + /** + * Heuristically drops every second update messag. + * Drops every first, third, ... update message in between two non-update message; disregards transaction + * ids etc! + * Cf. OCPP 2.0.1. specification 2.1.9 "QueueAllMessages" + */ + bool drop_update_messages_from_transactional_message_queue() { + int drop_count = 0; + std::deque>> temporary_swap_queue; + bool remove_next_update_message = true; + while (!transaction_message_queue.empty()) { + auto element = transaction_message_queue.front(); + transaction_message_queue.pop_front(); + // drop every second update message (except last one) + if (remove_next_update_message && element->isTransactionUpdateMessage() && + transaction_message_queue.size() > 1) { + EVLOG_debug << "Drop transactional message " << element->uniqueId(); + database_handler->remove_transaction_message(element->uniqueId()); + drop_count++; + remove_next_update_message = false; + } else { + remove_next_update_message = true; + temporary_swap_queue.push_back(element); + } + } + + std::swap(transaction_message_queue, temporary_swap_queue); + + if (drop_count > 0) { + EVLOG_warning << "Dropped " << drop_count << " transactional update messages to reduce queue size."; + return true; + } else { + EVLOG_warning << "There are no further transaction update messages to drop!"; + return false; + } + } + public: /// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback MessageQueue(const std::function& send_callback, const MessageQueueConfig& config, @@ -307,7 +353,7 @@ template class MessageQueue { if (!this->send_callback(this->in_flight->message)) { this->paused = true; EVLOG_error << "Could not send message, this is most likely because the charge point is offline."; - if (this->isTransactionMessage(this->in_flight)) { + if (this->in_flight && this->in_flight->isTransactionMessage()) { EVLOG_info << "The message in flight is transaction related and will be sent again once the " "connection can be established again."; if (this->in_flight->message.at(CALL_ACTION) == "TransactionEvent") { @@ -382,7 +428,7 @@ template class MessageQueue { } auto message = std::make_shared>(call); - if (this->isTransactionMessage(message)) { + if (message->isTransactionMessage()) { // according to the spec the "transaction related messages" StartTransaction, StopTransaction and // MeterValues have to be delivered in chronological order @@ -440,7 +486,7 @@ template class MessageQueue { auto enhanced_message = EnhancedMessage(); enhanced_message.offline = true; message->promise.set_value(enhanced_message); - } else if (this->isTransactionMessage(message)) { + } else if (message->isTransactionMessage()) { // according to the spec the "transaction related messages" StartTransaction, StopTransaction and // MeterValues have to be delivered in chronological order this->add_to_transaction_message_queue(message); @@ -528,7 +574,7 @@ template class MessageQueue { this->in_flight->message.at(CALL_ACTION).template get() + std::string("Response")); this->in_flight->promise.set_value(enhanced_message); - if (isTransactionMessage(this->in_flight)) { + if (this->in_flight->isTransactionMessage()) { // We only remove the message as soon as a response is received. Otherwise we might miss a message if // the charging station just boots after sending, but before receiving the result. this->database_handler->remove_transaction_message(this->in_flight->uniqueId()); @@ -551,7 +597,7 @@ template class MessageQueue { std::lock_guard lk(this->message_mutex); EVLOG_warning << "Message timeout or CALLERROR for: " << this->in_flight->messageType << " (" << this->in_flight->uniqueId() << ")"; - if (this->isTransactionMessage(this->in_flight)) { + if (this->in_flight->isTransactionMessage()) { if (this->in_flight->message_attempts < this->config.transaction_message_attempts) { EVLOG_warning << "Message is transaction related and will therefore be sent again"; this->in_flight->message[MESSAGE_ID] = this->createMessageId(); diff --git a/lib/ocpp/common/message_queue.cpp b/lib/ocpp/common/message_queue.cpp index 1a76b916d4..9844a8a66e 100644 --- a/lib/ocpp/common/message_queue.cpp +++ b/lib/ocpp/common/message_queue.cpp @@ -13,36 +13,37 @@ template <> ControlMessage::ControlMessage(json message) { this->message_attempts = 0; } +template <> bool ControlMessage::isTransactionMessage() const { + if (this->messageType == v16::MessageType::StartTransaction || + this->messageType == v16::MessageType::StopTransaction || this->messageType == v16::MessageType::MeterValues || + this->messageType == v16::MessageType::SecurityEventNotification) { + return true; + } + return false; +} + +template <> bool ControlMessage::isTransactionUpdateMessage() const { + return (this->messageType == v16::MessageType::MeterValues); +} + template <> ControlMessage::ControlMessage(json message) { this->message = message.get(); this->messageType = v201::conversions::string_to_messagetype(message.at(CALL_ACTION)); this->message_attempts = 0; } -template <> -bool MessageQueue::isTransactionMessage( - const std::shared_ptr> message) const { - if (message == nullptr) { - return false; - } - if (message->messageType == v16::MessageType::StartTransaction || - message->messageType == v16::MessageType::StopTransaction || - message->messageType == v16::MessageType::MeterValues || - message->messageType == v16::MessageType::SecurityEventNotification) { +template <> bool ControlMessage::isTransactionMessage() const { + if (this->messageType == v201::MessageType::TransactionEvent || + this->messageType == v201::MessageType::SecurityEventNotification) { // A04.FR.02 return true; } return false; } -template <> -bool MessageQueue::isTransactionMessage( - const std::shared_ptr> message) const { - if (message == nullptr) { - return false; - } - if (message->messageType == v201::MessageType::TransactionEvent || - message->messageType == v201::MessageType::SecurityEventNotification) { // A04.FR.02 - return true; +template <> bool ControlMessage::isTransactionUpdateMessage() const { + if (this->messageType == v201::MessageType::TransactionEvent) { + return v201::TransactionEventRequest{this->message.at(CALL_PAYLOAD)}.eventType == + v201::TransactionEventEnum::Updated; } return false; } diff --git a/tests/lib/ocpp/common/test_message_queue.cpp b/tests/lib/ocpp/common/test_message_queue.cpp index 8327ffb3bb..0a3f8d7cfd 100644 --- a/tests/lib/ocpp/common/test_message_queue.cpp +++ b/tests/lib/ocpp/common/test_message_queue.cpp @@ -1,18 +1,28 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest - #include #include #include #include +#include +#include +#include +#include +#include namespace ocpp { using json = nlohmann::json; +/************************************************************************************************ + * Test Message Types + */ + enum class TestMessageType { TRANSACTIONAL, TRANSACTIONAL_RESPONSE, + TRANSACTIONAL_UPDATE, + TRANSACTIONAL_UPDATE_RESPONSE, NON_TRANSACTIONAL, NON_TRANSACTIONAL_RESPONSE, InternalError, @@ -25,6 +35,10 @@ static std::string to_string(TestMessageType m) { return "transactional"; case TestMessageType::TRANSACTIONAL_RESPONSE: return "transactionalResponse"; + case TestMessageType::TRANSACTIONAL_UPDATE: + return "transactional_update"; + case TestMessageType::TRANSACTIONAL_UPDATE_RESPONSE: + return "transactional_updateResponse"; case TestMessageType::NON_TRANSACTIONAL: return "non_transactional"; case TestMessageType::NON_TRANSACTIONAL_RESPONSE: @@ -44,6 +58,12 @@ static TestMessageType to_test_message_type(const std::string& s) { if (s == "transactionalResponse") { return TestMessageType::TRANSACTIONAL_RESPONSE; } + if (s == "transactional_update") { + return TestMessageType::TRANSACTIONAL_UPDATE; + } + if (s == "transactional_updateResponse") { + return TestMessageType::TRANSACTIONAL_UPDATE_RESPONSE; + } if (s == "non_transactional") { return TestMessageType::NON_TRANSACTIONAL; } @@ -100,15 +120,102 @@ std::ostream& operator<<(std::ostream& os, const TestMessageType& message_type) return os; }; -template <> -bool MessageQueue::isTransactionMessage( - const std::shared_ptr> message) const { - if (message == nullptr) { - return false; - } - return message->messageType == TestMessageType::TRANSACTIONAL; +template <> bool ControlMessage::isTransactionMessage() const { + return this->messageType == TestMessageType::TRANSACTIONAL || + this->messageType == TestMessageType::TRANSACTIONAL_UPDATE; +} + +template <> bool ControlMessage::isTransactionUpdateMessage() const { + return this->messageType == TestMessageType::TRANSACTIONAL_UPDATE; +} + +/************************************************************************************************ + * ControlMessage + * + * Test implementations of ControlMessage template + */ +class ControlMessageV16Test : public ::testing::Test { + +protected: +}; + +TEST_F(ControlMessageV16Test, test_is_transactional) { + + EXPECT_TRUE( + (ControlMessage{Call{v16::StartTransactionRequest{}, "0"}}) + .isTransactionMessage()); + EXPECT_TRUE( + (ControlMessage{Call{v16::StopTransactionRequest{}, "0"}}) + .isTransactionMessage()); + EXPECT_TRUE((ControlMessage{ + Call{v16::SecurityEventNotificationRequest{}, "0"}}) + .isTransactionMessage()); + EXPECT_TRUE((ControlMessage{Call{v16::MeterValuesRequest{}, "0"}}) + .isTransactionMessage()); + + EXPECT_TRUE(!(ControlMessage{Call{v16::AuthorizeRequest{}, "0"}}) + .isTransactionMessage()); +} + +TEST_F(ControlMessageV16Test, test_is_transactional_update) { + + EXPECT_TRUE( + !(ControlMessage{Call{v16::StartTransactionRequest{}, "0"}}) + .isTransactionUpdateMessage()); + EXPECT_TRUE( + !(ControlMessage{Call{v16::StopTransactionRequest{}, "0"}}) + .isTransactionUpdateMessage()); + EXPECT_TRUE(!(ControlMessage{ + Call{v16::SecurityEventNotificationRequest{}, "0"}}) + .isTransactionUpdateMessage()); + EXPECT_TRUE((ControlMessage{Call{v16::MeterValuesRequest{}, "0"}}) + .isTransactionUpdateMessage()); + + EXPECT_TRUE(!(ControlMessage{Call{v16::AuthorizeRequest{}, "0"}}) + .isTransactionUpdateMessage()); +} + +class ControlMessageV201Test : public ::testing::Test { + +protected: +}; + +TEST_F(ControlMessageV201Test, test_is_transactional) { + + EXPECT_TRUE( + (ControlMessage{Call{v201::TransactionEventRequest{}, "0"}}) + .isTransactionMessage()); + + EXPECT_TRUE(!(ControlMessage{Call{v201::AuthorizeRequest{}, "0"}}) + .isTransactionMessage()); +} + +TEST_F(ControlMessageV201Test, test_is_transactional_update) { + + v201::TransactionEventRequest transaction_event_request{}; + transaction_event_request.eventType = v201::TransactionEventEnum::Updated; + + EXPECT_TRUE((ControlMessage{Call{transaction_event_request, "0"}}) + .isTransactionUpdateMessage()); + + transaction_event_request.eventType = v201::TransactionEventEnum::Started; + EXPECT_TRUE( + !(ControlMessage{Call{transaction_event_request, "0"}}) + .isTransactionUpdateMessage()); + + transaction_event_request.eventType = v201::TransactionEventEnum::Ended; + EXPECT_TRUE( + !(ControlMessage{Call{transaction_event_request, "0"}}) + .isTransactionUpdateMessage()); + + EXPECT_TRUE(!(ControlMessage{Call{v201::AuthorizeRequest{}, "0"}}) + .isTransactionUpdateMessage()); } +/************************************************************************************************ + * MessageQueueTest + */ + class DatabaseHandlerBaseMock : public common::DatabaseHandlerBase { public: MOCK_METHOD(std::vector, get_transaction_messages, (), (override)); @@ -121,7 +228,7 @@ class MessageQueueTest : public ::testing::Test { int call_count{0}; protected: - MessageQueueConfig config; + MessageQueueConfig config{}; std::shared_ptr db; std::mutex call_marker_mutex; std::condition_variable call_marker_cond_var; @@ -135,7 +242,7 @@ class MessageQueueTest : public ::testing::Test { } template auto MarkAndReturn(R value, bool respond = false) { - return testing::Invoke([this, value, respond](json::array_t s) -> R { + return testing::Invoke([this, value, respond](const json::array_t& s) -> R { if (respond) { reception_timer.timeout( [this, s]() { @@ -362,4 +469,81 @@ TEST_F(MessageQueueTest, test_clean_up_non_transactional_queue) { get_call_count()); } +// \brief Test that if the max size threshold is exceeded, intermediate transactional (update) messages are dropped +// Sends both non-transactions and transactional messages while on pause, expects all non-transactional, and any except +// every forth transactional to be dropped +TEST_F(MessageQueueTest, test_clean_up_transactional_queue) { + + const int sent_non_transactional_messages = 10; + const std::vector transaction_update_messages{0, 4, 6, + 2}; // meaning there are 4 transactions, each with a "start" and + // "stop" message and the provided number of updates; + // in total 4*2 + 4+ 6 +2 = 20 messages + config.queues_total_size_threshold = 13; + /** + * Message IDs: + * non-transactional: 0 - 9 + * Transaction I: 10 - 11 + * Transaction II: 12 - 17 + * Transaction III: 18 - 25 + * Transaction IV: 26 - 29 + * + * Expected dropping behavior + * - adding msg 13-22 -> each drop 1 non-transactional (floored 10% of queue thresholds) + * - adding msg 23 (update of third transaction) -> drop 4 messages with ids 13,15,19,21 + * - adding msg 27 (update of fourth transaction) -> drop 3 message with ids 14,20,23 + */ + const std::set expected_dropped_transaction_messages = { + "test_call_13", "test_call_15", "test_call_19", "test_call_21", "test_call_14", "test_call_20", "test_call_23", + }; + const int expected_sent_messages = 13; + config.queue_all_messages = true; + init_message_queue(); + + EXPECT_CALL(*db, insert_transaction_message(testing::_)).Times(20).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*db, remove_transaction_message(testing::_)).Times(20).WillRepeatedly(testing::Return()); + + // go offline + message_queue->pause(); + + // Send messages / set up expected calls + testing::Sequence s; + for (int i = 0; i < sent_non_transactional_messages; i++) { + push_message_call(TestMessageType::NON_TRANSACTIONAL); + } + + for (int update_messages : transaction_update_messages) { + // transaction "start" + auto start_msg_id = push_message_call(TestMessageType::TRANSACTIONAL); + EXPECT_CALL(send_callback_mock, Call(json{2, start_msg_id, to_string(TestMessageType::TRANSACTIONAL), + json{{"data", start_msg_id}}})) + .InSequence(s) + .WillOnce(MarkAndReturn(true, true)); + + for (int i = 0; i < update_messages; i++) { + auto update_msg_id = push_message_call(TestMessageType::TRANSACTIONAL_UPDATE); + + if (!expected_dropped_transaction_messages.count(update_msg_id)) { + EXPECT_CALL(send_callback_mock, + Call(json{2, update_msg_id, to_string(TestMessageType::TRANSACTIONAL_UPDATE), + json{{"data", update_msg_id}}})) + .InSequence(s) + .WillOnce(MarkAndReturn(true, true)); + } + } + + auto stop_msg_id = push_message_call(TestMessageType::TRANSACTIONAL); + // transaction "end" + EXPECT_CALL(send_callback_mock, + Call(json{2, stop_msg_id, to_string(TestMessageType::TRANSACTIONAL), json{{"data", stop_msg_id}}})) + .InSequence(s) + .WillOnce(MarkAndReturn(true, true)); + } + + // Resume & verify + message_queue->resume(); + + wait_for_calls(expected_sent_messages); +} + } // namespace ocpp