Skip to content

Commit

Permalink
[C/C++] Add test to validate that publication error frames are passed…
Browse files Browse the repository at this point in the history
… to all clients with the same publication and not to ones that without it.
  • Loading branch information
mikeb01 committed Aug 28, 2024
1 parent 00ae033 commit 66d79e8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
7 changes: 7 additions & 0 deletions aeron-client/src/main/cpp_wrapper/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,13 @@ class Context
return *this;
}

/**
* Set handler to receive notifications when a publication error is received for a publication that this client
* is interested in.
*
* @param handler
* @return reference to this Context instance.
*/
inline this_t &errorFrameHandler(on_publication_error_frame_t &handler)
{
m_onErrorFrameHandler = handler;
Expand Down
77 changes: 77 additions & 0 deletions aeron-client/src/test/cpp_wrapper/RejectImageTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ class RejectImageTest : public testing::TestWithParam<std::string>
return "";
}

static std::shared_ptr<Aeron> connectClient(std::atomic<std::int32_t > &counter)
{
Context ctx;
ctx.useConductorAgentInvoker(true);

on_publication_error_frame_t errorFrameHandler =
[&](aeron::status::PublicationErrorFrame &errorFrame)
{
std::atomic_fetch_add(&counter, 1);
};

ctx.errorFrameHandler(errorFrameHandler);
return Aeron::connect(ctx);
}

protected:
EmbeddedMediaDriver m_driver;
};
Expand Down Expand Up @@ -176,3 +191,65 @@ TEST_P(RejectImageTest, shouldRejectImageForExclusive)

POLL_FOR(0 < errorFrameCount, invoker);
}

TEST_P(RejectImageTest, shouldOnlySeePublicationErrorFramesForPublicationsAddedToTheClient)
{
const std::string address = GetParam();
const std::string channel = "aeron:udp?endpoint=" + address + ":10000";
const int streamId = 10000;

std::atomic<std::int32_t> errorFrameCount0{0};
std::shared_ptr<Aeron> aeron0 = connectClient(errorFrameCount0);
AgentInvoker<ClientConductor> &invoker0 = aeron0->conductorAgentInvoker();

std::atomic<std::int32_t> errorFrameCount1{0};
std::shared_ptr<Aeron> aeron1 = connectClient(errorFrameCount1);
AgentInvoker<ClientConductor> &invoker1 = aeron1->conductorAgentInvoker();

std::atomic<std::int32_t> errorFrameCount2{0};
std::shared_ptr<Aeron> aeron2 = connectClient(errorFrameCount2);
AgentInvoker<ClientConductor> &invoker2 = aeron2->conductorAgentInvoker();

invoker0.start();
invoker1.start();
invoker2.start();

std::int64_t pub0Id = aeron0->addPublication(channel, streamId);
std::int64_t subId = aeron0->addSubscription(channel, streamId);
invoker0.invoke();

POLL_FOR_NON_NULL(pub0, aeron0->findPublication(pub0Id), invoker0);
POLL_FOR_NON_NULL(sub, aeron0->findSubscription(subId), invoker0);
POLL_FOR(pub0->isConnected() && sub->isConnected(), invoker0);

std::int64_t pub1Id = aeron1->addPublication(channel, streamId);
invoker1.invoke();
POLL_FOR_NON_NULL(pub1, aeron1->findPublication(pub1Id), invoker1);

std::string message = "Hello World!";

auto *data = reinterpret_cast<const uint8_t *>(message.c_str());
POLL_FOR(0 < pub0->offer(data, message.length()), invoker0);
POLL_FOR(0 < sub->poll(
[&](concurrent::AtomicBuffer &buffer, util::index_t offset, util::index_t length, Header &header)
{
EXPECT_EQ(message, buffer.getStringWithoutLength(offset, length));
},
1), invoker0);

POLL_FOR(1 == sub->imageCount(), invoker0);

const std::shared_ptr<Image> image = sub->imageByIndex(0);
image->reject("No Longer Valid");

POLL_FOR(0 < errorFrameCount0, invoker0);
POLL_FOR(0 < errorFrameCount1, invoker1);

int64_t timeout_ms = aeron_epoch_clock() + 500;
while (aeron_epoch_clock() < timeout_ms)
{
invoker2.invoke();
ASSERT_EQ(0, errorFrameCount2);
std::this_thread::sleep_for(std::chrono::duration<long, std::milli>(1));
}
}

0 comments on commit 66d79e8

Please sign in to comment.