Skip to content

Commit

Permalink
[Java] Naming consistency.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Feb 13, 2020
1 parent 3a6d130 commit 12ab5de
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ void onAddRcvDestination(final long registrationId, final String destinationChan
receiveChannelEndpoint.validateAllowsDestinationControl();

final UdpChannel udpChannel = UdpChannel.parse(destinationChannel);
final ReceiveDestinationUdpTransport transport = new ReceiveDestinationUdpTransport(udpChannel, ctx);
final ReceiveDestinationTransport transport = new ReceiveDestinationTransport(udpChannel, ctx);

receiverProxy.addDestination(receiveChannelEndpoint, transport);
clientProxy.operationSucceeded(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.ImageConnection;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.ReceiveDestinationUdpTransport;
import io.aeron.driver.media.ReceiveDestinationTransport;
import io.aeron.driver.reports.LossReport;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
Expand Down Expand Up @@ -427,7 +427,7 @@ void activate()
* @param transportIndex from which packets will arrive.
* @param transport from which packets will arrive.
*/
void addDestination(final int transportIndex, final ReceiveDestinationUdpTransport transport)
void addDestination(final int transportIndex, final ReceiveDestinationTransport transport)
{
imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1);

Expand Down
21 changes: 8 additions & 13 deletions aeron-driver/src/main/java/io/aeron/driver/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.aeron.driver.media.DataTransportPoller;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.ReceiveDestinationUdpTransport;
import io.aeron.driver.media.ReceiveDestinationTransport;
import io.aeron.driver.media.UdpChannel;
import org.agrona.CloseHelper;
import org.agrona.collections.ArrayListUtil;
Expand Down Expand Up @@ -148,10 +148,8 @@ public void onRegisterReceiveChannelEndpoint(final ReceiveChannelEndpoint channe

if (channelEndpoint.hasExplicitControl())
{
addPendingSetupMessage(
0, 0, 0, channelEndpoint, true, channelEndpoint.explicitControlAddress());
channelEndpoint.sendSetupElicitingStatusMessage(
0, channelEndpoint.explicitControlAddress(), 0, 0);
addPendingSetupMessage(0, 0, 0, channelEndpoint, true, channelEndpoint.explicitControlAddress());
channelEndpoint.sendSetupElicitingStatusMessage(0, channelEndpoint.explicitControlAddress(), 0, 0);
}
}
else
Expand Down Expand Up @@ -184,7 +182,7 @@ public void onRemoveCoolDown(final ReceiveChannelEndpoint channelEndpoint, final
}

public void onAddDestination(
final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationUdpTransport transport)
final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationTransport transport)
{
transport.openChannel(conductorProxy, channelEndpoint.statusIndicatorCounter());

Expand All @@ -194,10 +192,8 @@ public void onAddDestination(

if (transport.hasExplicitControl())
{
addPendingSetupMessage(
0, 0, transportIndex, channelEndpoint, true, transport.explicitControlAddress());
channelEndpoint.sendSetupElicitingStatusMessage(
transportIndex, transport.explicitControlAddress(), 0, 0);
addPendingSetupMessage(0, 0, transportIndex, channelEndpoint, true, transport.explicitControlAddress());
channelEndpoint.sendSetupElicitingStatusMessage(transportIndex, transport.explicitControlAddress(), 0, 0);
}

for (final PublicationImage image : publicationImages)
Expand All @@ -209,14 +205,13 @@ public void onAddDestination(
}
}

public void onRemoveDestination(
final ReceiveChannelEndpoint channelEndpoint, final UdpChannel udpChannel)
public void onRemoveDestination(final ReceiveChannelEndpoint channelEndpoint, final UdpChannel udpChannel)
{
final int transportIndex = channelEndpoint.destination(udpChannel);

if (ArrayUtil.UNKNOWN_INDEX != transportIndex)
{
final ReceiveDestinationUdpTransport transport = channelEndpoint.destination(transportIndex);
final ReceiveDestinationTransport transport = channelEndpoint.destination(transportIndex);

dataTransportPoller.cancelRead(channelEndpoint, transport);
channelEndpoint.removeDestination(transportIndex);
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.aeron.driver;

import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.ReceiveDestinationUdpTransport;
import io.aeron.driver.media.ReceiveDestinationTransport;
import io.aeron.driver.media.UdpChannel;
import org.agrona.concurrent.status.AtomicCounter;

Expand Down Expand Up @@ -151,7 +151,7 @@ public void removeCoolDown(final ReceiveChannelEndpoint channelEndpoint, final i
}

public void addDestination(
final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationUdpTransport transport)
final ReceiveChannelEndpoint channelEndpoint, final ReceiveDestinationTransport transport)
{
if (notConcurrent())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@

final class MultiRcvDestination
{
private static final ReceiveDestinationUdpTransport[] EMPTY_TRANSPORTS = new ReceiveDestinationUdpTransport[0];
private static final ReceiveDestinationTransport[] EMPTY_TRANSPORTS = new ReceiveDestinationTransport[0];

private final long destinationEndpointTimeoutNs;
private final ErrorHandler errorHandler;
private final NanoClock nanoClock;
private ReceiveDestinationUdpTransport[] transports = EMPTY_TRANSPORTS;
private ReceiveDestinationTransport[] transports = EMPTY_TRANSPORTS;
private int numDestinations = 0;

MultiRcvDestination(final NanoClock nanoClock, final long timeoutNs, final ErrorHandler errorHandler)
Expand All @@ -45,7 +45,7 @@ final class MultiRcvDestination

void close(final DataTransportPoller poller)
{
for (final ReceiveDestinationUdpTransport transport : transports)
for (final ReceiveDestinationTransport transport : transports)
{
AeronCloseHelper.close(errorHandler, transport);
if (null != poller)
Expand All @@ -55,7 +55,7 @@ void close(final DataTransportPoller poller)
}
}

int addDestination(final ReceiveDestinationUdpTransport transport)
int addDestination(final ReceiveDestinationTransport transport)
{
int index = transports.length;

Expand Down Expand Up @@ -86,19 +86,19 @@ boolean hasDestination(final int transportIndex)
return numDestinations > transportIndex && null != transports[transportIndex];
}

ReceiveDestinationUdpTransport transport(final int transportIndex)
ReceiveDestinationTransport transport(final int transportIndex)
{
return transports[transportIndex];
}

int transport(final UdpChannel udpChannel)
{
final ReceiveDestinationUdpTransport[] transports = this.transports;
final ReceiveDestinationTransport[] transports = this.transports;
int index = ArrayUtil.UNKNOWN_INDEX;

for (int i = 0, length = transports.length; i < length; i++)
{
final ReceiveDestinationUdpTransport transport = transports[i];
final ReceiveDestinationTransport transport = transports[i];

if (null != transport && transport.udpChannel().equals(udpChannel))
{
Expand All @@ -116,7 +116,7 @@ int sendToAll(
final int bufferPosition,
final int bytesToSend)
{
final ReceiveDestinationUdpTransport[] transports = this.transports;
final ReceiveDestinationTransport[] transports = this.transports;
final long nowNs = nanoClock.nanoTime();
int minBytesSent = bytesToSend;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static io.aeron.driver.media.UdpChannelTransport.sendError;

abstract class MultiDestination
abstract class MultiSndDestination
{
abstract int send(DatagramChannel channel, ByteBuffer buffer, SendChannelEndpoint channelEndpoint, int bytesToSend);

Expand Down Expand Up @@ -72,7 +72,7 @@ static int send(
}
}

class ManualMultiDestination extends MultiDestination
class ManualSndMultiDestination extends MultiSndDestination
{
private static final InetSocketAddress[] EMPTY_DESTINATIONS = new InetSocketAddress[0];

Expand Down Expand Up @@ -149,15 +149,15 @@ void removeDestination(final InetSocketAddress address)
}
}

class DynamicMultiDestination extends MultiDestination
class DynamicSndMultiDestination extends MultiSndDestination
{
private static final Destination[] EMPTY_DESTINATIONS = new Destination[0];

private final long destinationTimeoutNs;
private final CachedNanoClock nanoClock;
private Destination[] destinations = EMPTY_DESTINATIONS;

DynamicMultiDestination(final CachedNanoClock nanoClock, final long timeout)
DynamicSndMultiDestination(final CachedNanoClock nanoClock, final long timeout)
{
this.nanoClock = nanoClock;
this.destinationTimeoutNs = timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ else if (0 == transportIndex)
}
}

public int addDestination(final ReceiveDestinationUdpTransport transport)
public int addDestination(final ReceiveDestinationTransport transport)
{
return multiRcvDestination.addDestination(transport);
}
Expand All @@ -372,7 +372,7 @@ public int destination(final UdpChannel udpChannel)
return multiRcvDestination.transport(udpChannel);
}

public ReceiveDestinationUdpTransport destination(final int transportIndex)
public ReceiveDestinationTransport destination(final int transportIndex)
{
return multiRcvDestination.transport(transportIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
/**
* Destination endpoint representation for reception into an image from a UDP transport.
*/
public class ReceiveDestinationUdpTransport extends UdpChannelTransport
public class ReceiveDestinationTransport extends UdpChannelTransport
{
public ReceiveDestinationUdpTransport(final UdpChannel udpChannel, final MediaDriver.Context context)
public ReceiveDestinationTransport(final UdpChannel udpChannel, final MediaDriver.Context context)
{
super(udpChannel, udpChannel.remoteData(), udpChannel.remoteData(), null, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SendChannelEndpoint extends UdpChannelTransport

private int refCount = 0;
private final BiInt2ObjectMap<NetworkPublication> publicationBySessionAndStreamId = new BiInt2ObjectMap<>();
private final MultiDestination multiDestination;
private final MultiSndDestination multiSndDestination;
private final AtomicCounter statusMessagesReceived;
private final AtomicCounter nakMessagesReceived;
private final AtomicCounter statusIndicator;
Expand All @@ -65,17 +65,17 @@ public SendChannelEndpoint(
statusMessagesReceived = context.systemCounters().get(STATUS_MESSAGES_RECEIVED);
this.statusIndicator = statusIndicator;

MultiDestination multiDestination = null;
MultiSndDestination multiSndDestination = null;
if (udpChannel.isManualControlMode())
{
multiDestination = new ManualMultiDestination();
multiSndDestination = new ManualSndMultiDestination();
}
else if (udpChannel.isDynamicControlMode() || udpChannel.hasExplicitControl())
{
multiDestination = new DynamicMultiDestination(context.cachedNanoClock(), DESTINATION_TIMEOUT);
multiSndDestination = new DynamicSndMultiDestination(context.cachedNanoClock(), DESTINATION_TIMEOUT);
}

this.multiDestination = multiDestination;
this.multiSndDestination = multiSndDestination;
}

public void decRef()
Expand Down Expand Up @@ -185,7 +185,7 @@ public int send(final ByteBuffer buffer)
{
final int bytesToSend = buffer.remaining();

if (null == multiDestination)
if (null == multiSndDestination)
{
try
{
Expand All @@ -205,7 +205,7 @@ public int send(final ByteBuffer buffer)
}
else
{
bytesSent = multiDestination.send(sendDatagramChannel, buffer, this, bytesToSend);
bytesSent = multiSndDestination.send(sendDatagramChannel, buffer, this, bytesToSend);
}
}

Expand All @@ -222,9 +222,9 @@ public void onStatusMessage(
final int streamId = msg.streamId();
final NetworkPublication publication = publicationBySessionAndStreamId.get(sessionId, streamId);

if (null != multiDestination)
if (null != multiSndDestination)
{
multiDestination.onStatusMessage(msg, srcAddress);
multiSndDestination.onStatusMessage(msg, srcAddress);

if (0 == sessionId && 0 == streamId && SEND_SETUP_FLAG == (msg.flags() & SEND_SETUP_FLAG))
{
Expand Down Expand Up @@ -279,19 +279,19 @@ public void onRttMeasurement(

public void validateAllowsManualControl()
{
if (!(multiDestination instanceof ManualMultiDestination))
if (!(multiSndDestination instanceof ManualSndMultiDestination))
{
throw new ControlProtocolException(ErrorCode.INVALID_CHANNEL, "channel does not allow manual control");
}
}

public void addDestination(final InetSocketAddress address)
{
multiDestination.addDestination(address);
multiSndDestination.addDestination(address);
}

public void removeDestination(final InetSocketAddress address)
{
multiDestination.removeDestination(address);
multiSndDestination.removeDestination(address);
}
}

0 comments on commit 12ab5de

Please sign in to comment.