From 625d5a822cf318a67fb2ccce4488417dfe669178 Mon Sep 17 00:00:00 2001 From: Wojciech Lukowicz Date: Thu, 14 Dec 2023 15:42:29 +0000 Subject: [PATCH 1/3] [C++] Fix wrapper's [Exclusive]Publication.maxPayloadLength(). --- aeron-client/src/main/c/aeronc.h | 2 +- aeron-client/src/main/cpp/ExclusivePublication.h | 2 +- aeron-client/src/main/cpp/Publication.h | 2 +- aeron-client/src/main/cpp_wrapper/ExclusivePublication.h | 4 ++-- aeron-client/src/main/cpp_wrapper/Publication.h | 4 ++-- aeron-client/src/main/java/io/aeron/Publication.java | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/aeron-client/src/main/c/aeronc.h b/aeron-client/src/main/c/aeronc.h index 241c931628..683f426c78 100644 --- a/aeron-client/src/main/c/aeronc.h +++ b/aeron-client/src/main/c/aeronc.h @@ -915,7 +915,7 @@ typedef struct aeron_publication_constants_stct /** * Maximum length of a message payload that fits within a message fragment. * - * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. */ size_t max_payload_length; diff --git a/aeron-client/src/main/cpp/ExclusivePublication.h b/aeron-client/src/main/cpp/ExclusivePublication.h index 18d1e67921..48abc3c69c 100644 --- a/aeron-client/src/main/cpp/ExclusivePublication.h +++ b/aeron-client/src/main/cpp/ExclusivePublication.h @@ -167,7 +167,7 @@ class CLIENT_EXPORT ExclusivePublication /** * Maximum length of a message payload that fits within a message fragment. * - * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. * * @return maximum message fragment payload length. */ diff --git a/aeron-client/src/main/cpp/Publication.h b/aeron-client/src/main/cpp/Publication.h index 47a1e85182..c7557e0205 100644 --- a/aeron-client/src/main/cpp/Publication.h +++ b/aeron-client/src/main/cpp/Publication.h @@ -178,7 +178,7 @@ class CLIENT_EXPORT Publication /** * Maximum length of a message payload that fits within a message fragment. * - * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. * * @return maximum message fragment payload length. */ diff --git a/aeron-client/src/main/cpp_wrapper/ExclusivePublication.h b/aeron-client/src/main/cpp_wrapper/ExclusivePublication.h index 98598129d8..ca5e5d1270 100644 --- a/aeron-client/src/main/cpp_wrapper/ExclusivePublication.h +++ b/aeron-client/src/main/cpp_wrapper/ExclusivePublication.h @@ -159,13 +159,13 @@ class ExclusivePublication /** * Maximum length of a message payload that fits within a message fragment. * - * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. * * @return maximum message fragment payload length. */ inline util::index_t maxPayloadLength() const { - return static_cast(m_constants.max_message_length); + return static_cast(m_constants.max_payload_length); } /** diff --git a/aeron-client/src/main/cpp_wrapper/Publication.h b/aeron-client/src/main/cpp_wrapper/Publication.h index 69d92cae5b..42a3f8d902 100644 --- a/aeron-client/src/main/cpp_wrapper/Publication.h +++ b/aeron-client/src/main/cpp_wrapper/Publication.h @@ -190,13 +190,13 @@ class Publication /** * Maximum length of a message payload that fits within a message fragment. * - * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. * * @return maximum message fragment payload length. */ inline util::index_t maxPayloadLength() const { - return static_cast(m_constants.max_message_length); + return static_cast(m_constants.max_payload_length); } /** diff --git a/aeron-client/src/main/java/io/aeron/Publication.java b/aeron-client/src/main/java/io/aeron/Publication.java index 6a7a3d8a4a..4033c894e0 100644 --- a/aeron-client/src/main/java/io/aeron/Publication.java +++ b/aeron-client/src/main/java/io/aeron/Publication.java @@ -230,7 +230,7 @@ public int maxMessageLength() /** * Maximum length of a message payload that fits within a message fragment. *

- * This is he MTU length minus the message fragment header length. + * This is the MTU length minus the message fragment header length. * * @return maximum message fragment payload length. */ From 3f2ff4e0523a50fc274cc2e53917fdd9959446cd Mon Sep 17 00:00:00 2001 From: Wojciech Lukowicz Date: Thu, 14 Dec 2023 16:27:04 +0000 Subject: [PATCH 2/3] [C++] Fix wrapper's FragmentAssembler. In the wrapper offset is always 0, we need to take it from the header instead like the C client does. --- .../src/main/cpp_wrapper/FragmentAssembler.h | 6 +- .../src/test/cpp_wrapper/SystemTest.cpp | 80 ++++++++++++++++++- 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/aeron-client/src/main/cpp_wrapper/FragmentAssembler.h b/aeron-client/src/main/cpp_wrapper/FragmentAssembler.h index 5a24c9a922..b45869fcd2 100644 --- a/aeron-client/src/main/cpp_wrapper/FragmentAssembler.h +++ b/aeron-client/src/main/cpp_wrapper/FragmentAssembler.h @@ -100,7 +100,7 @@ class FragmentAssembler { BufferBuilder &builder = getBuffer(header.sessionId()); auto nextOffset = BitUtil::align( - offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT); + header.termOffset() + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT); builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset); } @@ -112,7 +112,7 @@ class FragmentAssembler { BufferBuilder &builder = result->second; - if (offset == builder.nextTermOffset()) + if (header.termOffset() == builder.nextTermOffset()) { builder.append(buffer, offset, length, header); @@ -129,7 +129,7 @@ class FragmentAssembler else { auto nextOffset = BitUtil::align( - offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT); + header.termOffset() + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT); builder.nextTermOffset(nextOffset); } } diff --git a/aeron-client/src/test/cpp_wrapper/SystemTest.cpp b/aeron-client/src/test/cpp_wrapper/SystemTest.cpp index 31e0357fca..ff239014dc 100644 --- a/aeron-client/src/test/cpp_wrapper/SystemTest.cpp +++ b/aeron-client/src/test/cpp_wrapper/SystemTest.cpp @@ -15,11 +15,14 @@ */ #include +#include +#include #include #include "EmbeddedMediaDriver.h" #include "Aeron.h" +#include "FragmentAssembler.h" #include "TestUtil.h" using namespace aeron; @@ -191,4 +194,79 @@ TEST_F(SystemTest, shouldAddRemoveCloseHandler) EXPECT_EQ(1, closeCount1); EXPECT_EQ(0, closeCount2); -} \ No newline at end of file +} + +class Exchanger +{ + using generator_t = std::independent_bits_engine; + +public: + explicit Exchanger( + const std::shared_ptr &subscription, + const std::shared_ptr &publication) : + m_subscription(subscription), + m_publication(publication), + m_assembler( + FragmentAssembler( + [&](AtomicBuffer &buffer, index_t offset, index_t length, Header &header) + { + m_innerHandler(buffer, offset, length, header); + })), + m_outerHandler(m_assembler.handler()), + m_generator(generator_t(m_rd())) + { + } + + void exchange(int messageSize) + { + std::vector vec(messageSize); + std::generate(std::begin(vec), std::end(vec), std::ref(m_generator)); + + AtomicBuffer buffer(vec.data(), messageSize); + ASSERT_GT(m_publication->offer(buffer), 0); + + int count = 0; + m_innerHandler = [&](AtomicBuffer &buffer, index_t offset, index_t length, Header &header) + { + count++; + ASSERT_EQ(messageSize, length); + ASSERT_EQ(0, memcmp(buffer.buffer() + offset, vec.data(), length)); + }; + + std::int64_t t0 = aeron_epoch_clock(); + while (count == 0) + { + m_subscription->poll(m_outerHandler, 10); + ASSERT_LT(aeron_epoch_clock() - t0, AERON_TEST_TIMEOUT) << "Failed waiting for: count > 0"; + std::this_thread::yield(); + } + + ASSERT_EQ(1, count); + } + +private: + std::shared_ptr m_subscription; + std::shared_ptr m_publication; + FragmentAssembler m_assembler; + fragment_handler_t m_outerHandler; + fragment_handler_t m_innerHandler; + std::random_device m_rd; + generator_t m_generator; +}; + +TEST_F(SystemTest, shouldFragmentAndReassembleMessagesIfNeeded) +{ + std::shared_ptr aeron = Aeron::connect(); + + int32_t streamId = 1000; + int64_t subscriptionId = aeron->addSubscription(IPC_CHANNEL, streamId); + int64_t publicationId = aeron->addExclusivePublication(IPC_CHANNEL, streamId); + WAIT_FOR_NON_NULL(subscription, aeron->findSubscription(subscriptionId)); + WAIT_FOR_NON_NULL(publication, aeron->findExclusivePublication(publicationId)); + WAIT_FOR(publication->isConnected()); + + Exchanger exchanger(subscription, publication); + exchanger.exchange(publication->maxPayloadLength() + 1); + exchanger.exchange(publication->maxPayloadLength() * 3); + exchanger.exchange(32); +} From 9d00e824919a26911b6b2020369e063abc6e8514 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Thu, 14 Dec 2023 11:37:34 -1100 Subject: [PATCH 3/3] [C] Use unsigned short for generator in SystemTest and cast to uint_8 to match with standards definition for the std::independent_bits_engine. --- aeron-client/src/test/cpp_wrapper/SystemTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeron-client/src/test/cpp_wrapper/SystemTest.cpp b/aeron-client/src/test/cpp_wrapper/SystemTest.cpp index ff239014dc..97847eb5de 100644 --- a/aeron-client/src/test/cpp_wrapper/SystemTest.cpp +++ b/aeron-client/src/test/cpp_wrapper/SystemTest.cpp @@ -198,7 +198,7 @@ TEST_F(SystemTest, shouldAddRemoveCloseHandler) class Exchanger { - using generator_t = std::independent_bits_engine; + using generator_t = std::independent_bits_engine; public: explicit Exchanger( @@ -220,7 +220,7 @@ class Exchanger void exchange(int messageSize) { std::vector vec(messageSize); - std::generate(std::begin(vec), std::end(vec), std::ref(m_generator)); + std::generate(std::begin(vec), std::end(vec), [&] () { return static_cast(m_generator()); } ); AtomicBuffer buffer(vec.data(), messageSize); ASSERT_GT(m_publication->offer(buffer), 0);