diff --git a/aeron-client/src/main/cpp_wrapper/Context.h b/aeron-client/src/main/cpp_wrapper/Context.h index 825ee05990..1f39cb123f 100644 --- a/aeron-client/src/main/cpp_wrapper/Context.h +++ b/aeron-client/src/main/cpp_wrapper/Context.h @@ -534,6 +534,13 @@ class Context return *this; } + /** + * Set handler to receive notifications when a publication error is received for a publication that this client + * is interested in. + * + * @param handler + * @return reference to this Context instance. + */ inline this_t &errorFrameHandler(on_publication_error_frame_t &handler) { m_onErrorFrameHandler = handler; diff --git a/aeron-client/src/test/cpp_wrapper/RejectImageTest.cpp b/aeron-client/src/test/cpp_wrapper/RejectImageTest.cpp index cdd9de722d..250cb2b4cf 100644 --- a/aeron-client/src/test/cpp_wrapper/RejectImageTest.cpp +++ b/aeron-client/src/test/cpp_wrapper/RejectImageTest.cpp @@ -62,6 +62,21 @@ class RejectImageTest : public testing::TestWithParam return ""; } + static std::shared_ptr connectClient(std::atomic &counter) + { + Context ctx; + ctx.useConductorAgentInvoker(true); + + on_publication_error_frame_t errorFrameHandler = + [&](aeron::status::PublicationErrorFrame &errorFrame) + { + std::atomic_fetch_add(&counter, 1); + }; + + ctx.errorFrameHandler(errorFrameHandler); + return Aeron::connect(ctx); + } + protected: EmbeddedMediaDriver m_driver; }; @@ -176,3 +191,65 @@ TEST_P(RejectImageTest, shouldRejectImageForExclusive) POLL_FOR(0 < errorFrameCount, invoker); } + +TEST_P(RejectImageTest, shouldOnlySeePublicationErrorFramesForPublicationsAddedToTheClient) +{ + const std::string address = GetParam(); + const std::string channel = "aeron:udp?endpoint=" + address + ":10000"; + const int streamId = 10000; + + std::atomic errorFrameCount0{0}; + std::shared_ptr aeron0 = connectClient(errorFrameCount0); + AgentInvoker &invoker0 = aeron0->conductorAgentInvoker(); + + std::atomic errorFrameCount1{0}; + std::shared_ptr aeron1 = connectClient(errorFrameCount1); + AgentInvoker &invoker1 = aeron1->conductorAgentInvoker(); + + std::atomic errorFrameCount2{0}; + std::shared_ptr aeron2 = connectClient(errorFrameCount2); + AgentInvoker &invoker2 = aeron2->conductorAgentInvoker(); + + invoker0.start(); + invoker1.start(); + invoker2.start(); + + std::int64_t pub0Id = aeron0->addPublication(channel, streamId); + std::int64_t subId = aeron0->addSubscription(channel, streamId); + invoker0.invoke(); + + POLL_FOR_NON_NULL(pub0, aeron0->findPublication(pub0Id), invoker0); + POLL_FOR_NON_NULL(sub, aeron0->findSubscription(subId), invoker0); + POLL_FOR(pub0->isConnected() && sub->isConnected(), invoker0); + + std::int64_t pub1Id = aeron1->addPublication(channel, streamId); + invoker1.invoke(); + POLL_FOR_NON_NULL(pub1, aeron1->findPublication(pub1Id), invoker1); + + std::string message = "Hello World!"; + + auto *data = reinterpret_cast(message.c_str()); + POLL_FOR(0 < pub0->offer(data, message.length()), invoker0); + POLL_FOR(0 < sub->poll( + [&](concurrent::AtomicBuffer &buffer, util::index_t offset, util::index_t length, Header &header) + { + EXPECT_EQ(message, buffer.getStringWithoutLength(offset, length)); + }, + 1), invoker0); + + POLL_FOR(1 == sub->imageCount(), invoker0); + + const std::shared_ptr image = sub->imageByIndex(0); + image->reject("No Longer Valid"); + + POLL_FOR(0 < errorFrameCount0, invoker0); + POLL_FOR(0 < errorFrameCount1, invoker1); + + int64_t timeout_ms = aeron_epoch_clock() + 500; + while (aeron_epoch_clock() < timeout_ms) + { + invoker2.invoke(); + ASSERT_EQ(0, errorFrameCount2); + std::this_thread::sleep_for(std::chrono::duration(1)); + } +}