From 0f7909ba38d054096e9b1bdb4cb1cfca94171870 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 17 Jan 2025 18:07:17 +1300 Subject: [PATCH] [C] Update C driver to use the same matching logic as the Java driver for checking the validity of tagged publications and subscriptions. --- .../src/main/c/aeron_driver_conductor.c | 66 +++-------- .../c/media/aeron_receive_channel_endpoint.c | 23 ++++ .../c/media/aeron_receive_channel_endpoint.h | 5 + .../c/media/aeron_send_channel_endpoint.c | 15 +++ .../c/media/aeron_send_channel_endpoint.h | 5 + .../src/main/c/media/aeron_udp_channel.c | 106 +++++++++++++++++- .../src/main/c/media/aeron_udp_channel.h | 44 ++------ 7 files changed, 176 insertions(+), 88 deletions(-) diff --git a/aeron-driver/src/main/c/aeron_driver_conductor.c b/aeron-driver/src/main/c/aeron_driver_conductor.c index 7e2a93eb25..5df38ad03d 100644 --- a/aeron-driver/src/main/c/aeron_driver_conductor.c +++ b/aeron-driver/src/main/c/aeron_driver_conductor.c @@ -212,44 +212,6 @@ static bool aeron_driver_conductor_receive_endpoint_has_clashing_timestamp_offse return false; } -static int aeron_driver_conductor_tagged_channels_match(aeron_udp_channel_t *existing, aeron_udp_channel_t *other) -{ - if (!aeron_udp_channel_control_modes_match(existing, other)) - { - AERON_SET_ERR( - EINVAL, - "matching tag %" PRId64 " has mismatched control-mode: %.*s <> %.*s", - other->tag_id, - (int)existing->uri_length, - existing->original_uri, - (int)other->uri_length, - other->original_uri); - return -1; - } - - bool has_matching_endpoints = false; - if (aeron_udp_channel_endpoints_match(existing, other, &has_matching_endpoints) < 0) - { - AERON_APPEND_ERR("%s", ""); - return -1; - } - - if (!has_matching_endpoints) - { - AERON_SET_ERR( - EINVAL, - "matching tag %" PRId64 " has mismatched endpoint or control: %.*s <> %.*s", - other->tag_id, - (int)existing->uri_length, - existing->original_uri, - (int)other->uri_length, - other->original_uri); - return -1; - } - - return 0; -} - static int aeron_driver_conductor_find_existing_receive_channel_endpoint( aeron_driver_conductor_t *conductor, aeron_udp_channel_t *channel, @@ -262,13 +224,15 @@ static int aeron_driver_conductor_find_existing_receive_channel_endpoint( { endpoint = conductor->receive_channel_endpoints.array[i].endpoint; - if (channel->tag_id == endpoint->conductor_fields.udp_channel->tag_id) + bool has_match = false; + if (aeron_receive_channel_endpoint_matches_tag(endpoint, channel, &has_match) < 0) + { + AERON_APPEND_ERR("%s", ""); + return -1; + } + + if (has_match) { - if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0) - { - AERON_APPEND_ERR("%s", ""); - return -1; - } *result_endpoint = endpoint; return 0; } @@ -298,14 +262,16 @@ static int aeron_driver_conductor_find_existing_send_channel_endpoint( for (size_t i = 0, size = conductor->send_channel_endpoints.length; i < size; i++) { aeron_send_channel_endpoint_t *endpoint = conductor->send_channel_endpoints.array[i].endpoint; - if (channel->tag_id == endpoint->conductor_fields.udp_channel->tag_id) + + bool has_match = false; + if (aeron_send_channel_endpoint_matches_tag(endpoint, channel, &has_match) < 0) { - if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0) - { - AERON_APPEND_ERR("%s", ""); - return -1; - } + AERON_APPEND_ERR("%s", ""); + return -1; + } + if (has_match) + { *result_endpoint = endpoint; return 0; } diff --git a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c index 5272df8b40..ba5a67a593 100644 --- a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c +++ b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c @@ -636,6 +636,29 @@ int aeron_receive_channel_endpoint_on_rttm( return result; } +int aeron_receive_channel_endpoint_matches_tag( + aeron_receive_channel_endpoint_t *endpoint, + aeron_udp_channel_t *channel, + bool *has_match) +{ + struct sockaddr_storage* current_control_addr = NULL; + if (1 == endpoint->destinations.length && + endpoint->destinations.array[0].destination->conductor_fields.udp_channel == endpoint->conductor_fields.udp_channel && + endpoint->destinations.array[0].destination->conductor_fields.udp_channel->has_explicit_control) + { + current_control_addr = &endpoint->destinations.array[0].destination->current_control_addr; + } + + if (aeron_udp_channel_matches_tag( + channel, endpoint->conductor_fields.udp_channel, current_control_addr, NULL, has_match) < 0) + { + AERON_APPEND_ERR("%s", ""); + return -1; + } + + return 0; +} + void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint) { if (0 == endpoint->stream_id_to_refcnt_map.size && diff --git a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h index 724def8592..029442c775 100644 --- a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h +++ b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h @@ -197,6 +197,11 @@ int aeron_receive_channel_endpoint_on_unconnected_stream( size_t length, struct sockaddr_storage *addr); +int aeron_receive_channel_endpoint_matches_tag( + aeron_receive_channel_endpoint_t *endpoint, + aeron_udp_channel_t *channel, + bool *has_match); + void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint); int aeron_receive_channel_endpoint_incref_to_stream(aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id); diff --git a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c index b6d01f435b..e323b23c1c 100644 --- a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c +++ b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c @@ -654,6 +654,21 @@ int aeron_send_channel_endpoint_resolution_change( return 0; } +int aeron_send_channel_endpoint_matches_tag( + aeron_send_channel_endpoint_t *endpoint, + aeron_udp_channel_t *channel, + bool *has_match) +{ + if (aeron_udp_channel_matches_tag( + channel, endpoint->conductor_fields.udp_channel, NULL, &endpoint->current_data_addr, has_match) < 0) + { + AERON_APPEND_ERR("%s", ""); + return -1; + } + + return 0; +} + extern void aeron_send_channel_endpoint_sender_release(aeron_send_channel_endpoint_t *endpoint); extern bool aeron_send_channel_endpoint_has_sender_released(aeron_send_channel_endpoint_t *endpoint); diff --git a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h index 09236499ee..2764c3b34e 100644 --- a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h +++ b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h @@ -152,6 +152,11 @@ int aeron_send_channel_endpoint_resolution_change( const char *endpoint_name, struct sockaddr_storage *new_addr); +int aeron_send_channel_endpoint_matches_tag( + aeron_send_channel_endpoint_t *endpoint, + aeron_udp_channel_t *channel, + bool *has_match); + inline void aeron_send_channel_endpoint_sender_release(aeron_send_channel_endpoint_t *endpoint) { AERON_SET_RELEASE(endpoint->has_sender_released, true); diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.c b/aeron-driver/src/main/c/media/aeron_udp_channel.c index 46969bdfb2..a31e6bc0af 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel.c +++ b/aeron-driver/src/main/c/media/aeron_udp_channel.c @@ -28,6 +28,51 @@ #include "media/aeron_udp_channel.h" #include "command/aeron_control_protocol.h" +static int aeron_udp_channel_endpoints_match_with_override( + aeron_udp_channel_t *channel, + aeron_udp_channel_t *endpoint_channel, + struct sockaddr_storage *local_address, + struct sockaddr_storage *remote_address, + bool *result) +{ + bool cmp = false; + int rc = 0; + + if (aeron_udp_channel_is_wildcard(channel)) + { + *result = true; + return rc; + } + + struct sockaddr_storage *endpoint_remote_data = NULL != remote_address ? remote_address : + &endpoint_channel->remote_data; + struct sockaddr_storage *endpoint_local_data = NULL != local_address ? local_address : + &endpoint_channel->local_data; + + rc = aeron_sockaddr_storage_cmp(&channel->remote_data, endpoint_remote_data, &cmp); + if (rc < 0) + { + AERON_APPEND_ERR("%s", "remote_data"); + return rc; + } + + if (!cmp) + { + *result = cmp; + return 0; + } + + rc = aeron_sockaddr_storage_cmp(&channel->local_data, endpoint_local_data, &cmp); + if (rc < 0) + { + AERON_APPEND_ERR("%s", "local_data"); + return rc; + } + + *result = cmp; + return 0; +} + int aeron_ipv4_multicast_control_address(struct sockaddr_in *data_addr, struct sockaddr_in *control_addr) { uint8_t bytes[sizeof(struct in_addr)]; @@ -515,9 +560,66 @@ void aeron_udp_channel_delete(aeron_udp_channel_t *channel) } } -extern bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel); +int aeron_udp_channel_matches_tag( + aeron_udp_channel_t *channel, + aeron_udp_channel_t *endpoint_channel, + struct sockaddr_storage *local_address, + struct sockaddr_storage *remote_address, + bool *has_match) +{ + if (AERON_URI_INVALID_TAG == channel->tag_id || + AERON_URI_INVALID_TAG == endpoint_channel->tag_id || + channel->tag_id != endpoint_channel->tag_id) + { + *has_match = false; + return 0; + } -extern int aeron_udp_channel_endpoints_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other, bool *result); + if (!aeron_udp_channel_control_modes_match(channel, endpoint_channel)) + { + *has_match = false; + + AERON_SET_ERR( + EINVAL, + "matching tag %" PRId64 " has mismatched control-mode: %.*s <> %.*s", + channel->tag_id, + (int)channel->uri_length, + channel->original_uri, + (int)endpoint_channel->uri_length, + endpoint_channel->original_uri); + + return -1; + } + + bool addresses_match = false; + if (aeron_udp_channel_endpoints_match_with_override( + channel, endpoint_channel, local_address, remote_address, &addresses_match) < 0) + { + *has_match = false; + + AERON_APPEND_ERR("%s", ""); + return -1; + } + + if (!addresses_match) + { + AERON_SET_ERR( + EINVAL, + "matching tag %" PRId64 " has mismatched endpoint or control: %.*s <> %.*s", + channel->tag_id, + (int)channel->uri_length, + channel->original_uri, + (int)endpoint_channel->uri_length, + endpoint_channel->original_uri); + return -1; + } + + *has_match = true; + return 0; +} + + +extern bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel); extern bool aeron_udp_channel_control_modes_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other); diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.h b/aeron-driver/src/main/c/media/aeron_udp_channel.h index 273bb57e78..2be75da8f0 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel.h +++ b/aeron-driver/src/main/c/media/aeron_udp_channel.h @@ -86,6 +86,13 @@ int aeron_udp_channel_parse( aeron_udp_channel_t **channel, bool is_destination); +int aeron_udp_channel_matches_tag( + aeron_udp_channel_t *channel, + aeron_udp_channel_t *endpoint_channel, + struct sockaddr_storage *local_address, + struct sockaddr_storage *remote_address, + bool *has_match); + void aeron_udp_channel_delete(aeron_udp_channel_t *channel); inline bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel) @@ -94,44 +101,9 @@ inline bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel) aeron_is_wildcard_addr(&channel->local_data) && aeron_is_wildcard_port(&channel->local_data); } -inline int aeron_udp_channel_endpoints_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other, bool *result) -{ - bool cmp = false; - int rc = 0; - - if (aeron_udp_channel_is_wildcard(other)) - { - *result = true; - return rc; - } - - rc = aeron_sockaddr_storage_cmp(&channel->remote_data, &other->remote_data, &cmp); - if (rc < 0) - { - AERON_APPEND_ERR("%s", "remote_data"); - return rc; - } - - if (!cmp) - { - *result = cmp; - return 0; - } - - rc = aeron_sockaddr_storage_cmp(&channel->local_data, &other->local_data, &cmp); - if (rc < 0) - { - AERON_APPEND_ERR("%s", "local_data"); - return rc; - } - - *result = cmp; - return 0; -} - inline bool aeron_udp_channel_control_modes_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other) { - return AERON_UDP_CHANNEL_CONTROL_MODE_NONE == other->control_mode || channel->control_mode == other->control_mode; + return AERON_UDP_CHANNEL_CONTROL_MODE_NONE == channel->control_mode || channel->control_mode == other->control_mode; } inline bool aeron_udp_channel_equals(aeron_udp_channel_t *a, aeron_udp_channel_t *b)