diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 580d15a406..9184229a65 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -2127,8 +2127,7 @@ private SendChannelEndpoint findExistingSendChannelEndpoint(final UdpChannel udp { for (final SendChannelEndpoint endpoint : sendChannelEndpointByChannelMap.values()) { - final UdpChannel endpointUdpChannel = endpoint.udpChannel(); - if (endpointUdpChannel.matchesTag(udpChannel)) + if (endpoint.matchesTag(udpChannel)) { return endpoint; } diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java index b105646780..fc22105fc9 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java @@ -619,7 +619,7 @@ public long tag() */ public boolean matchesTag(final UdpChannel udpChannel) { - return super.udpChannel.matchesTag(udpChannel); + return udpChannel.matchesTag(super.udpChannel, currentControlAddress, null); } /** diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java b/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java index 5e1a8d4b10..e16da599b4 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java @@ -634,6 +634,17 @@ private void applyChannelSendTimestamp(final ByteBuffer buffer) } } } + + /** + * Does the channel have a matching tag? + * + * @param udpChannel with tag to match against. + * @return true if the channel matches on tag identity. + */ + public boolean matchesTag(final UdpChannel udpChannel) + { + return udpChannel.matchesTag(super.udpChannel, null, connectAddress); + } } abstract class MultiSndDestinationLhsPadding diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java index 0be70be071..6963f1f2c6 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java @@ -775,22 +775,27 @@ public Long nakDelayNs() * Does this channel have a tag match to another channel having INADDR_ANY endpoints. * * @param udpChannel to match against. + * @param localAddress local address override to use for this channel. + * @param remoteAddress remote address override to use for this channel. * @return true if there is a match otherwise false. */ - public boolean matchesTag(final UdpChannel udpChannel) + public boolean matchesTag( + final UdpChannel udpChannel, + final InetSocketAddress localAddress, + final InetSocketAddress remoteAddress) { if (!hasTag || !udpChannel.hasTag() || tag != udpChannel.tag()) { return false; } - if (!matchesControlMode(udpChannel)) + if (!hasMatchingControlMode(udpChannel)) { throw new IllegalArgumentException( "matching tag=" + tag + " has mismatched control-mode: " + uriStr + " <> " + udpChannel.uriStr); } - if (!hasMatchingAddress(udpChannel)) + if (!hasMatchingAddress(udpChannel, localAddress, remoteAddress)) { throw new IllegalArgumentException( "matching tag=" + tag + " has mismatched endpoint or control: " + uriStr + " <> " + udpChannel.uriStr); @@ -799,24 +804,31 @@ public boolean matchesTag(final UdpChannel udpChannel) return true; } - private boolean hasMatchingAddress(final UdpChannel udpChannel) + private boolean isWildcard() { - final boolean otherChannelIsWildcard = udpChannel.remoteData().getAddress().isAnyLocalAddress() && - udpChannel.remoteData().getPort() == 0 && - udpChannel.localData().getAddress().isAnyLocalAddress() && - udpChannel.localData().getPort() == 0; - - final boolean otherChannelMatches = udpChannel.remoteData().getAddress().equals(remoteData.getAddress()) && - udpChannel.remoteData().getPort() == remoteData.getPort() && - udpChannel.localData().getAddress().equals(localData.getAddress()) && - udpChannel.localData().getPort() == localData.getPort(); + return remoteData.getAddress().isAnyLocalAddress() && + remoteData.getPort() == 0 && + localData.getAddress().isAnyLocalAddress() && + localData.getPort() == 0; + } - return otherChannelIsWildcard || otherChannelMatches; + private boolean hasMatchingControlMode(final UdpChannel udpChannel) + { + return controlMode() == ControlMode.NONE || controlMode() == udpChannel.controlMode(); } - private boolean matchesControlMode(final UdpChannel udpChannel) + private boolean hasMatchingAddress( + final UdpChannel udpChannel, + final InetSocketAddress localAddress, + final InetSocketAddress remoteAddress) { - return udpChannel.controlMode() == ControlMode.NONE || controlMode() == udpChannel.controlMode(); + final InetSocketAddress otherLocalData = localAddress != null ? localAddress : udpChannel.localData(); + final InetSocketAddress otherRemoteData = remoteAddress != null ? remoteAddress : udpChannel.remoteData(); + + return isWildcard() || remoteData().getAddress().equals(otherRemoteData.getAddress()) && + remoteData().getPort() == otherRemoteData.getPort() && + localData().getAddress().equals(otherLocalData.getAddress()) && + localData().getPort() == otherLocalData.getPort(); } /** diff --git a/aeron-system-tests/src/test/java/io/aeron/NameReResolutionTest.java b/aeron-system-tests/src/test/java/io/aeron/NameReResolutionTest.java index 926ca0ce44..579eafc09c 100644 --- a/aeron-system-tests/src/test/java/io/aeron/NameReResolutionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/NameReResolutionTest.java @@ -579,6 +579,54 @@ void shouldReResolveUnicastAddressWhenSendChannelEndpointIsReused() } } + @Test + @SlowTest + @InterruptAfter(10) + void shouldHandleTaggedSubscriptionsAddressWithReResolutionToMdcPublications() + { + final String taggedUri = SUBSCRIPTION_DYNAMIC_MDC_URI + "|tags=22701"; + + subscription = client.addSubscription(taggedUri, STREAM_ID); + assertFalse(subscription.isConnected()); + + assertTrue(updateNameResolutionStatus(countersReader, CONTROL_NAME, USE_RE_RESOLUTION_HOST)); + + publication = client.addPublication(SECOND_PUBLICATION_DYNAMIC_MDC_URI, STREAM_ID); + + Tests.awaitConnected(subscription); + + try (Subscription taggedSub1 = client.addSubscription("aeron:udp?tags=22701", STREAM_ID); + Subscription taggedSub2 = client.addSubscription(taggedUri, STREAM_ID)) + { + Tests.awaitConnected(taggedSub1); + Tests.awaitConnected(taggedSub2); + } + } + + @Test + @SlowTest + @InterruptAfter(10) + void shouldHandleTaggedPublication() + { + final String taggedUri = PUBLICATION_URI + "|tags=22701"; + + publication = client.addPublication(taggedUri, STREAM_ID); + assertFalse(publication.isConnected()); + + assertTrue(updateNameResolutionStatus(countersReader, ENDPOINT_NAME, USE_RE_RESOLUTION_HOST)); + + subscription = client.addSubscription(SECOND_SUBSCRIPTION_URI, STREAM_ID); + + Tests.awaitConnected(publication); + + try (Publication taggedPub1 = client.addPublication("aeron:udp?tags=22701", STREAM_ID); + Publication taggedPub2 = client.addPublication(taggedUri, STREAM_ID)) + { + Tests.awaitConnected(taggedPub1); + Tests.awaitConnected(taggedPub2); + } + } + private static void assumeBindAddressAvailable(final String address) { final String message = NetworkTestingUtil.isBindAddressAvailable(address);