diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 89997135d1..f45e939903 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -909,7 +909,7 @@ void onAddRcvDestination(final long registrationId, final String destinationChan receiveChannelEndpoint.validateAllowsDestinationControl(); final UdpChannel udpChannel = UdpChannel.parse(destinationChannel); - final ReceiveDestinationUdpTransport transport = new ReceiveDestinationUdpTransport(udpChannel, ctx); + final ReceiveDestinationTransport transport = new ReceiveDestinationTransport(udpChannel, ctx); receiverProxy.addDestination(receiveChannelEndpoint, transport); clientProxy.operationSucceeded(correlationId); diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 6d9684cfdc..fe1b5fbdf1 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -20,7 +20,7 @@ import io.aeron.driver.buffer.RawLog; import io.aeron.driver.media.ImageConnection; import io.aeron.driver.media.ReceiveChannelEndpoint; -import io.aeron.driver.media.ReceiveDestinationUdpTransport; +import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.reports.LossReport; import io.aeron.driver.status.SystemCounters; import io.aeron.logbuffer.LogBufferDescriptor; @@ -427,7 +427,7 @@ void activate() * @param transportIndex from which packets will arrive. * @param transport from which packets will arrive. */ - void addDestination(final int transportIndex, final ReceiveDestinationUdpTransport transport) + void addDestination(final int transportIndex, final ReceiveDestinationTransport transport) { imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1); diff --git a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java index 372844c1b2..4ad4470387 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java +++ b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java @@ -17,7 +17,7 @@ import io.aeron.driver.media.DataTransportPoller; import io.aeron.driver.media.ReceiveChannelEndpoint; -import io.aeron.driver.media.ReceiveDestinationUdpTransport; +import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.media.UdpChannel; import org.agrona.CloseHelper; import org.agrona.collections.ArrayListUtil; @@ -148,10 +148,8 @@ public void onRegisterReceiveChannelEndpoint(final ReceiveChannelEndpoint channe if (channelEndpoint.hasExplicitControl()) { - addPendingSetupMessage( - 0, 0, 0, channelEndpoint, true, channelEndpoint.explicitControlAddress()); - channelEndpoint.sendSetupElicitingStatusMessage( - 0, channelEndpoint.explicitControlAddress(), 0, 0); + addPendingSetupMessage(0, 0, 0, channelEndpoint, true, channelEndpoint.explicitControlAddress()); + channelEndpoint.sendSetupElicitingStatusMessage(0, channelEndpoint.explicitControlAddress(), 0, 0); } } else @@ -184,7 +182,7 @@ public void onRemoveCoolDown(final ReceiveChannelEndpoint channelEndpoint, final } public void onAddDestination( - final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationUdpTransport transport) + final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationTransport transport) { transport.openChannel(conductorProxy, channelEndpoint.statusIndicatorCounter()); @@ -194,10 +192,8 @@ public void onAddDestination( if (transport.hasExplicitControl()) { - addPendingSetupMessage( - 0, 0, transportIndex, channelEndpoint, true, transport.explicitControlAddress()); - channelEndpoint.sendSetupElicitingStatusMessage( - transportIndex, transport.explicitControlAddress(), 0, 0); + addPendingSetupMessage(0, 0, transportIndex, channelEndpoint, true, transport.explicitControlAddress()); + channelEndpoint.sendSetupElicitingStatusMessage(transportIndex, transport.explicitControlAddress(), 0, 0); } for (final PublicationImage image : publicationImages) @@ -209,14 +205,13 @@ public void onAddDestination( } } - public void onRemoveDestination( - final ReceiveChannelEndpoint channelEndpoint, final UdpChannel udpChannel) + public void onRemoveDestination(final ReceiveChannelEndpoint channelEndpoint, final UdpChannel udpChannel) { final int transportIndex = channelEndpoint.destination(udpChannel); if (ArrayUtil.UNKNOWN_INDEX != transportIndex) { - final ReceiveDestinationUdpTransport transport = channelEndpoint.destination(transportIndex); + final ReceiveDestinationTransport transport = channelEndpoint.destination(transportIndex); dataTransportPoller.cancelRead(channelEndpoint, transport); channelEndpoint.removeDestination(transportIndex); diff --git a/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java b/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java index c975c9b094..aa4fbe8956 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java +++ b/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java @@ -16,7 +16,7 @@ package io.aeron.driver; import io.aeron.driver.media.ReceiveChannelEndpoint; -import io.aeron.driver.media.ReceiveDestinationUdpTransport; +import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.media.UdpChannel; import org.agrona.concurrent.status.AtomicCounter; @@ -151,7 +151,7 @@ public void removeCoolDown(final ReceiveChannelEndpoint channelEndpoint, final i } public void addDestination( - final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationUdpTransport transport) + final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationTransport transport) { if (notConcurrent()) { diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/MultiRcvDestination.java b/aeron-driver/src/main/java/io/aeron/driver/media/MultiRcvDestination.java index 71f0262520..74a7103715 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/MultiRcvDestination.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/MultiRcvDestination.java @@ -28,12 +28,12 @@ final class MultiRcvDestination { - private static final ReceiveDestinationUdpTransport[] EMPTY_TRANSPORTS = new ReceiveDestinationUdpTransport[0]; + private static final ReceiveDestinationTransport[] EMPTY_TRANSPORTS = new ReceiveDestinationTransport[0]; private final long destinationEndpointTimeoutNs; private final ErrorHandler errorHandler; private final NanoClock nanoClock; - private ReceiveDestinationUdpTransport[] transports = EMPTY_TRANSPORTS; + private ReceiveDestinationTransport[] transports = EMPTY_TRANSPORTS; private int numDestinations = 0; MultiRcvDestination(final NanoClock nanoClock, final long timeoutNs, final ErrorHandler errorHandler) @@ -45,7 +45,7 @@ final class MultiRcvDestination void close(final DataTransportPoller poller) { - for (final ReceiveDestinationUdpTransport transport : transports) + for (final ReceiveDestinationTransport transport : transports) { AeronCloseHelper.close(errorHandler, transport); if (null != poller) @@ -55,7 +55,7 @@ void close(final DataTransportPoller poller) } } - int addDestination(final ReceiveDestinationUdpTransport transport) + int addDestination(final ReceiveDestinationTransport transport) { int index = transports.length; @@ -86,19 +86,19 @@ boolean hasDestination(final int transportIndex) return numDestinations > transportIndex && null != transports[transportIndex]; } - ReceiveDestinationUdpTransport transport(final int transportIndex) + ReceiveDestinationTransport transport(final int transportIndex) { return transports[transportIndex]; } int transport(final UdpChannel udpChannel) { - final ReceiveDestinationUdpTransport[] transports = this.transports; + final ReceiveDestinationTransport[] transports = this.transports; int index = ArrayUtil.UNKNOWN_INDEX; for (int i = 0, length = transports.length; i < length; i++) { - final ReceiveDestinationUdpTransport transport = transports[i]; + final ReceiveDestinationTransport transport = transports[i]; if (null != transport && transport.udpChannel().equals(udpChannel)) { @@ -116,7 +116,7 @@ int sendToAll( final int bufferPosition, final int bytesToSend) { - final ReceiveDestinationUdpTransport[] transports = this.transports; + final ReceiveDestinationTransport[] transports = this.transports; final long nowNs = nanoClock.nanoTime(); int minBytesSent = bytesToSend; diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/MultiDestination.java b/aeron-driver/src/main/java/io/aeron/driver/media/MultiSndDestination.java similarity index 96% rename from aeron-driver/src/main/java/io/aeron/driver/media/MultiDestination.java rename to aeron-driver/src/main/java/io/aeron/driver/media/MultiSndDestination.java index e8af1378c7..75931f8c0b 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/MultiDestination.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/MultiSndDestination.java @@ -26,7 +26,7 @@ import static io.aeron.driver.media.UdpChannelTransport.sendError; -abstract class MultiDestination +abstract class MultiSndDestination { abstract int send(DatagramChannel channel, ByteBuffer buffer, SendChannelEndpoint channelEndpoint, int bytesToSend); @@ -72,7 +72,7 @@ static int send( } } -class ManualMultiDestination extends MultiDestination +class ManualSndMultiDestination extends MultiSndDestination { private static final InetSocketAddress[] EMPTY_DESTINATIONS = new InetSocketAddress[0]; @@ -149,7 +149,7 @@ void removeDestination(final InetSocketAddress address) } } -class DynamicMultiDestination extends MultiDestination +class DynamicSndMultiDestination extends MultiSndDestination { private static final Destination[] EMPTY_DESTINATIONS = new Destination[0]; @@ -157,7 +157,7 @@ class DynamicMultiDestination extends MultiDestination private final CachedNanoClock nanoClock; private Destination[] destinations = EMPTY_DESTINATIONS; - DynamicMultiDestination(final CachedNanoClock nanoClock, final long timeout) + DynamicSndMultiDestination(final CachedNanoClock nanoClock, final long timeout) { this.nanoClock = nanoClock; this.destinationTimeoutNs = timeout; diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java index 2a4b8576ae..d0d002912c 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveChannelEndpoint.java @@ -357,7 +357,7 @@ else if (0 == transportIndex) } } - public int addDestination(final ReceiveDestinationUdpTransport transport) + public int addDestination(final ReceiveDestinationTransport transport) { return multiRcvDestination.addDestination(transport); } @@ -372,7 +372,7 @@ public int destination(final UdpChannel udpChannel) return multiRcvDestination.transport(udpChannel); } - public ReceiveDestinationUdpTransport destination(final int transportIndex) + public ReceiveDestinationTransport destination(final int transportIndex) { return multiRcvDestination.transport(transportIndex); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationUdpTransport.java b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationTransport.java similarity index 91% rename from aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationUdpTransport.java rename to aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationTransport.java index 926c5c3068..03e6b6d63d 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationUdpTransport.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/ReceiveDestinationTransport.java @@ -25,9 +25,9 @@ /** * Destination endpoint representation for reception into an image from a UDP transport. */ -public class ReceiveDestinationUdpTransport extends UdpChannelTransport +public class ReceiveDestinationTransport extends UdpChannelTransport { - public ReceiveDestinationUdpTransport(final UdpChannel udpChannel, final MediaDriver.Context context) + public ReceiveDestinationTransport(final UdpChannel udpChannel, final MediaDriver.Context context) { super(udpChannel, udpChannel.remoteData(), udpChannel.remoteData(), null, context); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java b/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java index f1ac21bde4..e7aafbb8b3 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/SendChannelEndpoint.java @@ -46,7 +46,7 @@ public class SendChannelEndpoint extends UdpChannelTransport private int refCount = 0; private final BiInt2ObjectMap publicationBySessionAndStreamId = new BiInt2ObjectMap<>(); - private final MultiDestination multiDestination; + private final MultiSndDestination multiSndDestination; private final AtomicCounter statusMessagesReceived; private final AtomicCounter nakMessagesReceived; private final AtomicCounter statusIndicator; @@ -65,17 +65,17 @@ public SendChannelEndpoint( statusMessagesReceived = context.systemCounters().get(STATUS_MESSAGES_RECEIVED); this.statusIndicator = statusIndicator; - MultiDestination multiDestination = null; + MultiSndDestination multiSndDestination = null; if (udpChannel.isManualControlMode()) { - multiDestination = new ManualMultiDestination(); + multiSndDestination = new ManualSndMultiDestination(); } else if (udpChannel.isDynamicControlMode() || udpChannel.hasExplicitControl()) { - multiDestination = new DynamicMultiDestination(context.cachedNanoClock(), DESTINATION_TIMEOUT); + multiSndDestination = new DynamicSndMultiDestination(context.cachedNanoClock(), DESTINATION_TIMEOUT); } - this.multiDestination = multiDestination; + this.multiSndDestination = multiSndDestination; } public void decRef() @@ -185,7 +185,7 @@ public int send(final ByteBuffer buffer) { final int bytesToSend = buffer.remaining(); - if (null == multiDestination) + if (null == multiSndDestination) { try { @@ -205,7 +205,7 @@ public int send(final ByteBuffer buffer) } else { - bytesSent = multiDestination.send(sendDatagramChannel, buffer, this, bytesToSend); + bytesSent = multiSndDestination.send(sendDatagramChannel, buffer, this, bytesToSend); } } @@ -222,9 +222,9 @@ public void onStatusMessage( final int streamId = msg.streamId(); final NetworkPublication publication = publicationBySessionAndStreamId.get(sessionId, streamId); - if (null != multiDestination) + if (null != multiSndDestination) { - multiDestination.onStatusMessage(msg, srcAddress); + multiSndDestination.onStatusMessage(msg, srcAddress); if (0 == sessionId && 0 == streamId && SEND_SETUP_FLAG == (msg.flags() & SEND_SETUP_FLAG)) { @@ -279,7 +279,7 @@ public void onRttMeasurement( public void validateAllowsManualControl() { - if (!(multiDestination instanceof ManualMultiDestination)) + if (!(multiSndDestination instanceof ManualSndMultiDestination)) { throw new ControlProtocolException(ErrorCode.INVALID_CHANNEL, "channel does not allow manual control"); } @@ -287,11 +287,11 @@ public void validateAllowsManualControl() public void addDestination(final InetSocketAddress address) { - multiDestination.addDestination(address); + multiSndDestination.addDestination(address); } public void removeDestination(final InetSocketAddress address) { - multiDestination.removeDestination(address); + multiSndDestination.removeDestination(address); } }