Skip to content

Commit

Permalink
Merge branch 'wojciech-adaptive-wrapper-fixes'
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Dec 14, 2023
2 parents 758ddbc + 9d00e82 commit 58a6e73
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 12 deletions.
2 changes: 1 addition & 1 deletion aeron-client/src/main/c/aeronc.h
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ typedef struct aeron_publication_constants_stct
/**
* Maximum length of a message payload that fits within a message fragment.
*
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*/
size_t max_payload_length;

Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/cpp/ExclusivePublication.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class CLIENT_EXPORT ExclusivePublication
/**
* Maximum length of a message payload that fits within a message fragment.
*
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*
* @return maximum message fragment payload length.
*/
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/cpp/Publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class CLIENT_EXPORT Publication
/**
* Maximum length of a message payload that fits within a message fragment.
*
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*
* @return maximum message fragment payload length.
*/
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/cpp_wrapper/ExclusivePublication.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ class ExclusivePublication
/**
* Maximum length of a message payload that fits within a message fragment.
*
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*
* @return maximum message fragment payload length.
*/
inline util::index_t maxPayloadLength() const
{
return static_cast<util::index_t>(m_constants.max_message_length);
return static_cast<util::index_t>(m_constants.max_payload_length);
}

/**
Expand Down
6 changes: 3 additions & 3 deletions aeron-client/src/main/cpp_wrapper/FragmentAssembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class FragmentAssembler
{
BufferBuilder &builder = getBuffer(header.sessionId());
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
header.termOffset() + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);

builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
Expand All @@ -112,7 +112,7 @@ class FragmentAssembler
{
BufferBuilder &builder = result->second;

if (offset == builder.nextTermOffset())
if (header.termOffset() == builder.nextTermOffset())
{
builder.append(buffer, offset, length, header);

Expand All @@ -129,7 +129,7 @@ class FragmentAssembler
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
header.termOffset() + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
builder.nextTermOffset(nextOffset);
}
}
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/cpp_wrapper/Publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ class Publication
/**
* Maximum length of a message payload that fits within a message fragment.
*
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*
* @return maximum message fragment payload length.
*/
inline util::index_t maxPayloadLength() const
{
return static_cast<util::index_t>(m_constants.max_message_length);
return static_cast<util::index_t>(m_constants.max_payload_length);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/java/io/aeron/Publication.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public int maxMessageLength()
/**
* Maximum length of a message payload that fits within a message fragment.
* <p>
* This is he MTU length minus the message fragment header length.
* This is the MTU length minus the message fragment header length.
*
* @return maximum message fragment payload length.
*/
Expand Down
80 changes: 79 additions & 1 deletion aeron-client/src/test/cpp_wrapper/SystemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/

#include <functional>
#include <random>
#include <climits>

#include <gtest/gtest.h>

#include "EmbeddedMediaDriver.h"
#include "Aeron.h"
#include "FragmentAssembler.h"
#include "TestUtil.h"

using namespace aeron;
Expand Down Expand Up @@ -191,4 +194,79 @@ TEST_F(SystemTest, shouldAddRemoveCloseHandler)

EXPECT_EQ(1, closeCount1);
EXPECT_EQ(0, closeCount2);
}
}

class Exchanger
{
using generator_t = std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned short>;

public:
explicit Exchanger(
const std::shared_ptr<Subscription> &subscription,
const std::shared_ptr<ExclusivePublication> &publication) :
m_subscription(subscription),
m_publication(publication),
m_assembler(
FragmentAssembler(
[&](AtomicBuffer &buffer, index_t offset, index_t length, Header &header)
{
m_innerHandler(buffer, offset, length, header);
})),
m_outerHandler(m_assembler.handler()),
m_generator(generator_t(m_rd()))
{
}

void exchange(int messageSize)
{
std::vector<uint8_t> vec(messageSize);
std::generate(std::begin(vec), std::end(vec), [&] () { return static_cast<uint8_t>(m_generator()); } );

AtomicBuffer buffer(vec.data(), messageSize);
ASSERT_GT(m_publication->offer(buffer), 0);

int count = 0;
m_innerHandler = [&](AtomicBuffer &buffer, index_t offset, index_t length, Header &header)
{
count++;
ASSERT_EQ(messageSize, length);
ASSERT_EQ(0, memcmp(buffer.buffer() + offset, vec.data(), length));
};

std::int64_t t0 = aeron_epoch_clock();
while (count == 0)
{
m_subscription->poll(m_outerHandler, 10);
ASSERT_LT(aeron_epoch_clock() - t0, AERON_TEST_TIMEOUT) << "Failed waiting for: count > 0";
std::this_thread::yield();
}

ASSERT_EQ(1, count);
}

private:
std::shared_ptr<Subscription> m_subscription;
std::shared_ptr<ExclusivePublication> m_publication;
FragmentAssembler m_assembler;
fragment_handler_t m_outerHandler;
fragment_handler_t m_innerHandler;
std::random_device m_rd;
generator_t m_generator;
};

TEST_F(SystemTest, shouldFragmentAndReassembleMessagesIfNeeded)
{
std::shared_ptr<Aeron> aeron = Aeron::connect();

int32_t streamId = 1000;
int64_t subscriptionId = aeron->addSubscription(IPC_CHANNEL, streamId);
int64_t publicationId = aeron->addExclusivePublication(IPC_CHANNEL, streamId);
WAIT_FOR_NON_NULL(subscription, aeron->findSubscription(subscriptionId));
WAIT_FOR_NON_NULL(publication, aeron->findExclusivePublication(publicationId));
WAIT_FOR(publication->isConnected());

Exchanger exchanger(subscription, publication);
exchanger.exchange(publication->maxPayloadLength() + 1);
exchanger.exchange(publication->maxPayloadLength() * 3);
exchanger.exchange(32);
}

0 comments on commit 58a6e73

Please sign in to comment.