Skip to content

Commit

Permalink
[C] Ensure that error frames are pass to the client for exclusive pub…
Browse files Browse the repository at this point in the history
…lications.
  • Loading branch information
mikeb01 committed Aug 2, 2024
1 parent fbc7efc commit 0129e9a
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 54 deletions.
17 changes: 9 additions & 8 deletions aeron-client/src/main/c/aeron_client_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2398,15 +2398,16 @@ void aeron_client_conductor_forward_error(void *clientd, int64_t key, void *valu
aeron_publication_error_t *response = (aeron_publication_error_t *)conductor_clientd->clientd;
aeron_client_command_base_t *resource = (aeron_client_command_base_t *)value;

if (AERON_CLIENT_TYPE_PUBLICATION == resource->type)
const bool is_publication = AERON_CLIENT_TYPE_PUBLICATION == resource->type &&
((aeron_publication_t *)resource)->original_registration_id == response->registration_id;
const bool is_exclusive_publication = AERON_CLIENT_TYPE_EXCLUSIVE_PUBLICATION == resource->type &&
((aeron_exclusive_publication_t *)resource)->original_registration_id == response->registration_id;

if (is_publication || is_exclusive_publication)
{
aeron_publication_t *publication = (aeron_publication_t *)resource;
if (response->registration_id == publication->original_registration_id)
{
// TODO: Use a union.
conductor->error_frame_handler(
conductor->error_handler_clientd, (aeron_publication_error_values_t *)response);
}
// TODO: Use a union or a copy...
conductor->error_frame_handler(
conductor->error_handler_clientd, (aeron_publication_error_values_t *)response);
}
}

Expand Down
1 change: 1 addition & 0 deletions aeron-client/src/main/cpp_wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ SET(HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/status/ReadablePosition.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/status/StatusIndicatorReader.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/status/UnsafeBufferPosition.h
${CMAKE_CURRENT_SOURCE_DIR}/status/PublicationErrorFrame.h
${CMAKE_CURRENT_SOURCE_DIR}/util/BitUtil.h
${CMAKE_CURRENT_SOURCE_DIR}/util/CommandOption.h
${CMAKE_CURRENT_SOURCE_DIR}/util/CommandOptionParser.h
Expand Down
3 changes: 2 additions & 1 deletion aeron-client/src/main/cpp_wrapper/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ class Context
m_onNewSubscriptionHandler(other.m_onNewSubscriptionHandler),
m_onAvailableCounterHandler(other.m_onAvailableCounterHandler),
m_onUnavailableCounterHandler(other.m_onUnavailableCounterHandler),
m_onCloseClientHandler(other.m_onCloseClientHandler)
m_onCloseClientHandler(other.m_onCloseClientHandler),
m_onErrorFrameHandler(other.m_onErrorFrameHandler)
{
other.m_context = nullptr;
}
Expand Down
90 changes: 45 additions & 45 deletions aeron-client/src/test/cpp_wrapper/WrapperSystemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,48 +171,48 @@ TEST_F(WrapperSystemTest, shouldRejectImage)
POLL_FOR(0 < errorFrameCount, invoker);
}

//TEST_F(WrapperSystemTest, shouldRejectImageForExclusive)
//{
// Context ctx;
// ctx.useConductorAgentInvoker(true);
//
// std::atomic<std::int32_t> errorFrameCount{0};
//
// on_error_frame_t errorFrameHandler =
// [&](aeron::status::PublicationErrorFrame &errorFrame)
// {
// std::atomic_fetch_add(&errorFrameCount, 1);
// return;
// };
//
// ctx.errorFrameHandler(errorFrameHandler);
// std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
// AgentInvoker<ClientConductor> &invoker = aeron->conductorAgentInvoker();
// invoker.start();
//
// std::int64_t pubId = aeron->addExclusivePublication("aeron:udp?endpoint=localhost:10000", 10000);
// std::int64_t subId = aeron->addSubscription("aeron:udp?endpoint=localhost:10000", 10000);
// invoker.invoke();
//
// POLL_FOR_NON_NULL(pub, aeron->findExclusivePublication(pubId), invoker);
// POLL_FOR_NON_NULL(sub, aeron->findSubscription(subId), invoker);
// POLL_FOR(pub->isConnected() && sub->isConnected(), invoker);
//
// std::string message = "Hello World!";
//
// const uint8_t *data = reinterpret_cast<const uint8_t *>(message.c_str());
// POLL_FOR(0 < pub->offer(data, message.length()), invoker);
// POLL_FOR(0 < sub->poll(
// [&](concurrent::AtomicBuffer &buffer, util::index_t offset, util::index_t length, Header &header)
// {
// EXPECT_EQ(message, buffer.getStringWithoutLength(offset, length));
// },
// 1), invoker);
//
// POLL_FOR(1 == sub->imageCount(), invoker);
//
// const std::shared_ptr<Image> image = sub->imageByIndex(0);
// image->reject("No Longer Valid");
//
// POLL_FOR(0 < errorFrameCount, invoker);
//}
TEST_F(WrapperSystemTest, shouldRejectImageForExclusive)
{
Context ctx;
ctx.useConductorAgentInvoker(true);

std::atomic<std::int32_t> errorFrameCount{0};

on_error_frame_t errorFrameHandler =
[&](aeron::status::PublicationErrorFrame &errorFrame)
{
std::atomic_fetch_add(&errorFrameCount, 1);
return;
};

ctx.errorFrameHandler(errorFrameHandler);
std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
AgentInvoker<ClientConductor> &invoker = aeron->conductorAgentInvoker();
invoker.start();

std::int64_t pubId = aeron->addExclusivePublication("aeron:udp?endpoint=localhost:10000", 10000);
std::int64_t subId = aeron->addSubscription("aeron:udp?endpoint=localhost:10000", 10000);
invoker.invoke();

POLL_FOR_NON_NULL(pub, aeron->findExclusivePublication(pubId), invoker);
POLL_FOR_NON_NULL(sub, aeron->findSubscription(subId), invoker);
POLL_FOR(pub->isConnected() && sub->isConnected(), invoker);

std::string message = "Hello World!";

const uint8_t *data = reinterpret_cast<const uint8_t *>(message.c_str());
POLL_FOR(0 < pub->offer(data, message.length()), invoker);
POLL_FOR(0 < sub->poll(
[&](concurrent::AtomicBuffer &buffer, util::index_t offset, util::index_t length, Header &header)
{
EXPECT_EQ(message, buffer.getStringWithoutLength(offset, length));
},
1), invoker);

POLL_FOR(1 == sub->imageCount(), invoker);

const std::shared_ptr<Image> image = sub->imageByIndex(0);
image->reject("No Longer Valid");

POLL_FOR(0 < errorFrameCount, invoker);
}

0 comments on commit 0129e9a

Please sign in to comment.