Skip to content

Commit

Permalink
[C] compare publication stream id with link stream id when checking f…
Browse files Browse the repository at this point in the history
…or matching spy subscriptions (#1722)
  • Loading branch information
nbradac authored Jan 16, 2025
1 parent b283de9 commit 2b1ed72
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 10 deletions.
14 changes: 5 additions & 9 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ static bool aeron_driver_conductor_network_subscription_link_matches_allowing_wi

static bool aeron_driver_conductor_spy_subscription_link_matches(
const aeron_subscription_link_t *link,
const aeron_network_publication_t *publication,
const int stream_id)
const aeron_network_publication_t *publication)
{
aeron_udp_channel_t *const spy_channel = link->spy_channel;
const aeron_udp_channel_t *publication_channel = publication->endpoint->conductor_fields.udp_channel;
bool is_same_channel_tag =
AERON_URI_INVALID_TAG != spy_channel->tag_id && spy_channel->tag_id == publication_channel->tag_id;
return stream_id == publication->stream_id &&
return link->stream_id == publication->stream_id &&
(is_same_channel_tag ||
(aeron_driver_conductor_is_wildcard_or_session_id_match(link, publication->session_id) &&
publication_channel->canonical_length == spy_channel->canonical_length &&
Expand Down Expand Up @@ -4202,8 +4201,7 @@ int aeron_driver_conductor_on_add_network_publication_complete(
{
aeron_subscription_link_t *subscription_link = &conductor->spy_subscriptions.array[i];

if (aeron_driver_conductor_spy_subscription_link_matches(
subscription_link, publication, command->stream_id) &&
if (aeron_driver_conductor_spy_subscription_link_matches(subscription_link, publication) &&
!aeron_driver_conductor_is_subscribable_linked(subscription_link, subscribable))
{
if (aeron_driver_conductor_link_subscribable(
Expand Down Expand Up @@ -4456,8 +4454,7 @@ int aeron_driver_conductor_on_add_spy_subscription_complete(
{
aeron_network_publication_t *publication = conductor->network_publications.array[i].publication;

if (aeron_driver_conductor_spy_subscription_link_matches(
link, publication, command->stream_id) &&
if (aeron_driver_conductor_spy_subscription_link_matches(link, publication) &&
aeron_network_publication_is_accepting_subscriptions(publication))
{
if (aeron_driver_conductor_link_subscribable(
Expand Down Expand Up @@ -5302,8 +5299,7 @@ int aeron_driver_conductor_on_add_receive_spy_destination_complete(
{
aeron_network_publication_t *publication = conductor->network_publications.array[i].publication;

if (aeron_driver_conductor_spy_subscription_link_matches(
link, publication, mds_subscription_link->stream_id) &&
if (aeron_driver_conductor_spy_subscription_link_matches(link, publication) &&
aeron_network_publication_is_accepting_subscriptions(publication))
{
if (aeron_driver_conductor_link_subscribable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ void after()
@FieldSource("channels")
void shouldAddWildcardSpyBeforePublication(final String channel)
{
TestMediaDriver.notSupportedOnCMediaDriver("spy bug");
final String subUri = CommonContext.SPY_PREFIX + channel;
final String pubUri = channel + "|ssc=true";
final int streamId1 = 333;
Expand Down

0 comments on commit 2b1ed72

Please sign in to comment.