Skip to content

Commit

Permalink
[C] Update C driver to use the same matching logic as the Java driver…
Browse files Browse the repository at this point in the history
… for checking the validity of tagged publications and subscriptions.
  • Loading branch information
mikeb01 committed Jan 17, 2025
1 parent 069010c commit 0f7909b
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 88 deletions.
66 changes: 16 additions & 50 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,44 +212,6 @@ static bool aeron_driver_conductor_receive_endpoint_has_clashing_timestamp_offse
return false;
}

static int aeron_driver_conductor_tagged_channels_match(aeron_udp_channel_t *existing, aeron_udp_channel_t *other)
{
if (!aeron_udp_channel_control_modes_match(existing, other))
{
AERON_SET_ERR(
EINVAL,
"matching tag %" PRId64 " has mismatched control-mode: %.*s <> %.*s",
other->tag_id,
(int)existing->uri_length,
existing->original_uri,
(int)other->uri_length,
other->original_uri);
return -1;
}

bool has_matching_endpoints = false;
if (aeron_udp_channel_endpoints_match(existing, other, &has_matching_endpoints) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

if (!has_matching_endpoints)
{
AERON_SET_ERR(
EINVAL,
"matching tag %" PRId64 " has mismatched endpoint or control: %.*s <> %.*s",
other->tag_id,
(int)existing->uri_length,
existing->original_uri,
(int)other->uri_length,
other->original_uri);
return -1;
}

return 0;
}

static int aeron_driver_conductor_find_existing_receive_channel_endpoint(
aeron_driver_conductor_t *conductor,
aeron_udp_channel_t *channel,
Expand All @@ -262,13 +224,15 @@ static int aeron_driver_conductor_find_existing_receive_channel_endpoint(
{
endpoint = conductor->receive_channel_endpoints.array[i].endpoint;

if (channel->tag_id == endpoint->conductor_fields.udp_channel->tag_id)
bool has_match = false;
if (aeron_receive_channel_endpoint_matches_tag(endpoint, channel, &has_match) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

if (has_match)
{
if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}
*result_endpoint = endpoint;
return 0;
}
Expand Down Expand Up @@ -298,14 +262,16 @@ static int aeron_driver_conductor_find_existing_send_channel_endpoint(
for (size_t i = 0, size = conductor->send_channel_endpoints.length; i < size; i++)
{
aeron_send_channel_endpoint_t *endpoint = conductor->send_channel_endpoints.array[i].endpoint;
if (channel->tag_id == endpoint->conductor_fields.udp_channel->tag_id)

bool has_match = false;
if (aeron_send_channel_endpoint_matches_tag(endpoint, channel, &has_match) < 0)
{
if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}
AERON_APPEND_ERR("%s", "");
return -1;
}

if (has_match)
{
*result_endpoint = endpoint;
return 0;
}
Expand Down
23 changes: 23 additions & 0 deletions aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,29 @@ int aeron_receive_channel_endpoint_on_rttm(
return result;
}

int aeron_receive_channel_endpoint_matches_tag(
aeron_receive_channel_endpoint_t *endpoint,
aeron_udp_channel_t *channel,
bool *has_match)
{
struct sockaddr_storage* current_control_addr = NULL;
if (1 == endpoint->destinations.length &&
endpoint->destinations.array[0].destination->conductor_fields.udp_channel == endpoint->conductor_fields.udp_channel &&
endpoint->destinations.array[0].destination->conductor_fields.udp_channel->has_explicit_control)
{
current_control_addr = &endpoint->destinations.array[0].destination->current_control_addr;
}

if (aeron_udp_channel_matches_tag(
channel, endpoint->conductor_fields.udp_channel, current_control_addr, NULL, has_match) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint)
{
if (0 == endpoint->stream_id_to_refcnt_map.size &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ int aeron_receive_channel_endpoint_on_unconnected_stream(
size_t length,
struct sockaddr_storage *addr);

int aeron_receive_channel_endpoint_matches_tag(
aeron_receive_channel_endpoint_t *endpoint,
aeron_udp_channel_t *channel,
bool *has_match);

void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint);

int aeron_receive_channel_endpoint_incref_to_stream(aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id);
Expand Down
15 changes: 15 additions & 0 deletions aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,21 @@ int aeron_send_channel_endpoint_resolution_change(
return 0;
}

int aeron_send_channel_endpoint_matches_tag(
aeron_send_channel_endpoint_t *endpoint,
aeron_udp_channel_t *channel,
bool *has_match)
{
if (aeron_udp_channel_matches_tag(
channel, endpoint->conductor_fields.udp_channel, NULL, &endpoint->current_data_addr, has_match) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

extern void aeron_send_channel_endpoint_sender_release(aeron_send_channel_endpoint_t *endpoint);

extern bool aeron_send_channel_endpoint_has_sender_released(aeron_send_channel_endpoint_t *endpoint);
Expand Down
5 changes: 5 additions & 0 deletions aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ int aeron_send_channel_endpoint_resolution_change(
const char *endpoint_name,
struct sockaddr_storage *new_addr);

int aeron_send_channel_endpoint_matches_tag(
aeron_send_channel_endpoint_t *endpoint,
aeron_udp_channel_t *channel,
bool *has_match);

inline void aeron_send_channel_endpoint_sender_release(aeron_send_channel_endpoint_t *endpoint)
{
AERON_SET_RELEASE(endpoint->has_sender_released, true);
Expand Down
106 changes: 104 additions & 2 deletions aeron-driver/src/main/c/media/aeron_udp_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,51 @@
#include "media/aeron_udp_channel.h"
#include "command/aeron_control_protocol.h"

static int aeron_udp_channel_endpoints_match_with_override(
aeron_udp_channel_t *channel,
aeron_udp_channel_t *endpoint_channel,
struct sockaddr_storage *local_address,
struct sockaddr_storage *remote_address,
bool *result)
{
bool cmp = false;
int rc = 0;

if (aeron_udp_channel_is_wildcard(channel))
{
*result = true;
return rc;
}

struct sockaddr_storage *endpoint_remote_data = NULL != remote_address ? remote_address :
&endpoint_channel->remote_data;
struct sockaddr_storage *endpoint_local_data = NULL != local_address ? local_address :
&endpoint_channel->local_data;

rc = aeron_sockaddr_storage_cmp(&channel->remote_data, endpoint_remote_data, &cmp);
if (rc < 0)
{
AERON_APPEND_ERR("%s", "remote_data");
return rc;
}

if (!cmp)
{
*result = cmp;
return 0;
}

rc = aeron_sockaddr_storage_cmp(&channel->local_data, endpoint_local_data, &cmp);
if (rc < 0)
{
AERON_APPEND_ERR("%s", "local_data");
return rc;
}

*result = cmp;
return 0;
}

int aeron_ipv4_multicast_control_address(struct sockaddr_in *data_addr, struct sockaddr_in *control_addr)
{
uint8_t bytes[sizeof(struct in_addr)];
Expand Down Expand Up @@ -515,9 +560,66 @@ void aeron_udp_channel_delete(aeron_udp_channel_t *channel)
}
}

extern bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel);
int aeron_udp_channel_matches_tag(
aeron_udp_channel_t *channel,
aeron_udp_channel_t *endpoint_channel,
struct sockaddr_storage *local_address,
struct sockaddr_storage *remote_address,
bool *has_match)
{
if (AERON_URI_INVALID_TAG == channel->tag_id ||
AERON_URI_INVALID_TAG == endpoint_channel->tag_id ||
channel->tag_id != endpoint_channel->tag_id)
{
*has_match = false;
return 0;
}

extern int aeron_udp_channel_endpoints_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other, bool *result);
if (!aeron_udp_channel_control_modes_match(channel, endpoint_channel))
{
*has_match = false;

AERON_SET_ERR(
EINVAL,
"matching tag %" PRId64 " has mismatched control-mode: %.*s <> %.*s",
channel->tag_id,
(int)channel->uri_length,
channel->original_uri,
(int)endpoint_channel->uri_length,
endpoint_channel->original_uri);

return -1;
}

bool addresses_match = false;
if (aeron_udp_channel_endpoints_match_with_override(
channel, endpoint_channel, local_address, remote_address, &addresses_match) < 0)
{
*has_match = false;

AERON_APPEND_ERR("%s", "");
return -1;
}

if (!addresses_match)
{
AERON_SET_ERR(
EINVAL,
"matching tag %" PRId64 " has mismatched endpoint or control: %.*s <> %.*s",
channel->tag_id,
(int)channel->uri_length,
channel->original_uri,
(int)endpoint_channel->uri_length,
endpoint_channel->original_uri);
return -1;
}

*has_match = true;
return 0;
}


extern bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel);

extern bool aeron_udp_channel_control_modes_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other);

Expand Down
44 changes: 8 additions & 36 deletions aeron-driver/src/main/c/media/aeron_udp_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ int aeron_udp_channel_parse(
aeron_udp_channel_t **channel,
bool is_destination);

int aeron_udp_channel_matches_tag(
aeron_udp_channel_t *channel,
aeron_udp_channel_t *endpoint_channel,
struct sockaddr_storage *local_address,
struct sockaddr_storage *remote_address,
bool *has_match);

void aeron_udp_channel_delete(aeron_udp_channel_t *channel);

inline bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel)
Expand All @@ -94,44 +101,9 @@ inline bool aeron_udp_channel_is_wildcard(aeron_udp_channel_t *channel)
aeron_is_wildcard_addr(&channel->local_data) && aeron_is_wildcard_port(&channel->local_data);
}

inline int aeron_udp_channel_endpoints_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other, bool *result)
{
bool cmp = false;
int rc = 0;

if (aeron_udp_channel_is_wildcard(other))
{
*result = true;
return rc;
}

rc = aeron_sockaddr_storage_cmp(&channel->remote_data, &other->remote_data, &cmp);
if (rc < 0)
{
AERON_APPEND_ERR("%s", "remote_data");
return rc;
}

if (!cmp)
{
*result = cmp;
return 0;
}

rc = aeron_sockaddr_storage_cmp(&channel->local_data, &other->local_data, &cmp);
if (rc < 0)
{
AERON_APPEND_ERR("%s", "local_data");
return rc;
}

*result = cmp;
return 0;
}

inline bool aeron_udp_channel_control_modes_match(aeron_udp_channel_t *channel, aeron_udp_channel_t *other)
{
return AERON_UDP_CHANNEL_CONTROL_MODE_NONE == other->control_mode || channel->control_mode == other->control_mode;
return AERON_UDP_CHANNEL_CONTROL_MODE_NONE == channel->control_mode || channel->control_mode == other->control_mode;
}

inline bool aeron_udp_channel_equals(aeron_udp_channel_t *a, aeron_udp_channel_t *b)
Expand Down

0 comments on commit 0f7909b

Please sign in to comment.