Skip to content

Commit

Permalink
[Java] Use "reject" instead of "invalidate" for marking images for er…
Browse files Browse the repository at this point in the history
…rors.
  • Loading branch information
mikeb01 committed Jul 8, 2024
1 parent e2edd08 commit 5bddf84
Show file tree
Hide file tree
Showing 16 changed files with 70 additions and 70 deletions.
6 changes: 3 additions & 3 deletions aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CmdInterceptor
CMD_IN_REMOVE_RCV_DESTINATION,
CMD_OUT_ON_CLIENT_TIMEOUT,
CMD_IN_TERMINATE_DRIVER,
CMD_IN_INVALIDATE_IMAGE);
CMD_IN_REJECT_IMAGE);

@SuppressWarnings("checkstyle:methodlength")
@Advice.OnMethodEnter
Expand Down Expand Up @@ -155,8 +155,8 @@ static void logCmd(final int msgTypeId, final DirectBuffer buffer, final int ind
LOGGER.log(CMD_IN_TERMINATE_DRIVER, buffer, index, length);
break;

case INVALIDATE_IMAGE:
LOGGER.log(CMD_IN_INVALIDATE_IMAGE, buffer, index, length);
case REJECT_IMAGE:
LOGGER.log(CMD_IN_REJECT_IMAGE, buffer, index, length);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public enum DriverEventCode implements EventCode
RESEND(55,
(code, buffer, offset, builder) -> DriverEventDissector.dissectResend(buffer, offset, builder)),

CMD_IN_INVALIDATE_IMAGE(56, DriverEventDissector::dissectCommand);
CMD_IN_REJECT_IMAGE(56, DriverEventDissector::dissectCommand);


static final int EVENT_CODE_TYPE = EventCodeType.DRIVER.getTypeCode();
Expand Down
20 changes: 10 additions & 10 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class DriverEventDissector
private static final SubscriptionReadyFlyweight SUBSCRIPTION_READY = new SubscriptionReadyFlyweight();
private static final ClientTimeoutFlyweight CLIENT_TIMEOUT = new ClientTimeoutFlyweight();
private static final TerminateDriverFlyweight TERMINATE_DRIVER = new TerminateDriverFlyweight();
private static final InvalidateImageFlyweight INVALIDATE_IMAGE = new InvalidateImageFlyweight();
private static final RejectImageFlyweight REJECT_IMAGE = new RejectImageFlyweight();

static final String CONTEXT = "DRIVER";

Expand Down Expand Up @@ -215,9 +215,9 @@ static void dissectCommand(
dissectTerminateDriver(builder);
break;

case CMD_IN_INVALIDATE_IMAGE:
INVALIDATE_IMAGE.wrap(buffer, offset + encodedLength);
dissectInvalidateImage(builder);
case CMD_IN_REJECT_IMAGE:
REJECT_IMAGE.wrap(buffer, offset + encodedLength);
dissectRejectImage(builder);
break;

default:
Expand Down Expand Up @@ -773,13 +773,13 @@ private static void dissectTerminateDriver(final StringBuilder builder)
.append(" tokenBufferLength=").append(TERMINATE_DRIVER.tokenBufferLength());
}

private static void dissectInvalidateImage(final StringBuilder builder)
private static void dissectRejectImage(final StringBuilder builder)
{
builder
.append("clientId=").append(INVALIDATE_IMAGE.clientId())
.append(" correlationId=").append(INVALIDATE_IMAGE.correlationId())
.append(" imageCorrelationId=").append(INVALIDATE_IMAGE.imageCorrelationId())
.append(" position=").append(INVALIDATE_IMAGE.position())
.append(" reason=").append(INVALIDATE_IMAGE.reason());
.append("clientId=").append(REJECT_IMAGE.clientId())
.append(" correlationId=").append(REJECT_IMAGE.correlationId())
.append(" imageCorrelationId=").append(REJECT_IMAGE.imageCorrelationId())
.append(" position=").append(REJECT_IMAGE.position())
.append(" reason=").append(REJECT_IMAGE.reason());
}
}
4 changes: 2 additions & 2 deletions aeron-client/src/main/java/io/aeron/ClientConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ void closeImages(final Image[] images, final UnavailableImageHandler unavailable
}
}

void invalidateImage(final long correlationId, final long position, final String reason)
void rejectImage(final long correlationId, final long position, final String reason)
{
clientLock.lock();
try
Expand All @@ -1327,7 +1327,7 @@ void invalidateImage(final long correlationId, final long position, final String

// TODO, check reason length??

final long registrationId = driverProxy.invalidateImage(correlationId, position, reason);
final long registrationId = driverProxy.rejectImage(correlationId, position, reason);
awaitResponse(registrationId);
}
finally
Expand Down
14 changes: 7 additions & 7 deletions aeron-client/src/main/java/io/aeron/DriverProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class DriverProxy
private final RemoveMessageFlyweight removeMessage = new RemoveMessageFlyweight();
private final DestinationMessageFlyweight destinationMessage = new DestinationMessageFlyweight();
private final CounterMessageFlyweight counterMessage = new CounterMessageFlyweight();
private final InvalidateImageFlyweight invalidateImage = new InvalidateImageFlyweight();
private final RejectImageFlyweight rejectImage = new RejectImageFlyweight();
private final RingBuffer toDriverCommandBuffer;

/**
Expand Down Expand Up @@ -462,29 +462,29 @@ public boolean terminateDriver(final DirectBuffer tokenBuffer, final int tokenOf
}

/**
* Invalidate a specific image.
* Reject a specific image.
*
* @param imageCorrelationId of the image to be invalidated
* @param position of the image when invalidation occurred
* @param reason user supplied reason for invalidation, reported back to publication
* @return the correlationId of the request for invalidation.
*/
public long invalidateImage(
public long rejectImage(
final long imageCorrelationId,
final long position,
final String reason)
{
final int length = InvalidateImageFlyweight.computeLength(reason);
final int index = toDriverCommandBuffer.tryClaim(INVALIDATE_IMAGE, length);
final int length = RejectImageFlyweight.computeLength(reason);
final int index = toDriverCommandBuffer.tryClaim(REJECT_IMAGE, length);

if (index < 0)
{
throw new AeronException("could not write invalidate image command");
throw new AeronException("could not write reject image command");
}

final long correlationId = toDriverCommandBuffer.nextCorrelationId();

invalidateImage
rejectImage
.wrap(toDriverCommandBuffer.buffer(), index)
.clientId(clientId)
.correlationId(correlationId)
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/java/io/aeron/Image.java
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,9 @@ public int rawPoll(final RawBlockHandler handler, final int blockLengthLimit)
*
* @param reason an error message to be forwarded back to the publication.
*/
public void invalidate(final String reason)
public void reject(final String reason)
{
subscription.invalidate(correlationId, position(), reason);
subscription.rejectImage(correlationId, position(), reason);
}

private UnsafeBuffer activeTermBuffer(final long position)
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/java/io/aeron/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ Image removeImage(final long correlationId)
return removedImage;
}

void invalidate(final long correlationId, final long position, final String reason)
void rejectImage(final long correlationId, final long position, final String reason)
{
conductor.invalidateImage(correlationId, position, reason);
conductor.rejectImage(correlationId, position, reason);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class ControlProtocolEvents
/**
* Invalidate an image.
*/
public static final int INVALIDATE_IMAGE = 0x0F;
public static final int REJECT_IMAGE = 0x0F;

// Media Driver to Clients

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* +---------------------------------------------------------------+
* </pre>
*/
public class InvalidateImageFlyweight extends CorrelatedMessageFlyweight
public class RejectImageFlyweight extends CorrelatedMessageFlyweight
{
private static final int IMAGE_CORRELATION_ID_FIELD_OFFSET = CORRELATION_ID_FIELD_OFFSET + SIZE_OF_LONG;
private static final int POSITION_FIELD_OFFSET = IMAGE_CORRELATION_ID_FIELD_OFFSET + SIZE_OF_LONG;
Expand All @@ -62,7 +62,7 @@ public class InvalidateImageFlyweight extends CorrelatedMessageFlyweight
* @param offset at which the message begins.
* @return this for a fluent API.
*/
public InvalidateImageFlyweight wrap(final MutableDirectBuffer buffer, final int offset)
public RejectImageFlyweight wrap(final MutableDirectBuffer buffer, final int offset)
{
super.wrap(buffer, offset);
return this;
Expand All @@ -84,7 +84,7 @@ public long imageCorrelationId()
* @param position new image correlation id value.
* @return this for a fluent API.
*/
public InvalidateImageFlyweight imageCorrelationId(final long position)
public RejectImageFlyweight imageCorrelationId(final long position)
{
buffer.putLong(offset + IMAGE_CORRELATION_ID_FIELD_OFFSET, position);
return this;
Expand All @@ -106,7 +106,7 @@ public long position()
* @param position new position value.
* @return this for a fluent API.
*/
public InvalidateImageFlyweight position(final long position)
public RejectImageFlyweight position(final long position)
{
buffer.putLong(offset + POSITION_FIELD_OFFSET, position);
return this;
Expand All @@ -118,7 +118,7 @@ public InvalidateImageFlyweight position(final long position)
* @param reason for invalidating the image.
* @return this for a fluent API.
*/
public InvalidateImageFlyweight reason(final String reason)
public RejectImageFlyweight reason(final String reason)
{
buffer.putStringAscii(offset + REASON_FIELD_OFFSET, reason);
return this;
Expand Down Expand Up @@ -149,7 +149,7 @@ public int reasonBufferLength()
/**
* {@inheritDoc}
*/
public InvalidateImageFlyweight clientId(final long clientId)
public RejectImageFlyweight clientId(final long clientId)
{
super.clientId(clientId);
return this;
Expand All @@ -158,7 +158,7 @@ public InvalidateImageFlyweight clientId(final long clientId)
/**
* {@inheritDoc}
*/
public InvalidateImageFlyweight correlationId(final long correlationId)
public RejectImageFlyweight correlationId(final long correlationId)
{
super.correlationId(correlationId);
return this;
Expand Down
6 changes: 3 additions & 3 deletions aeron-client/src/test/java/io/aeron/ImageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ void shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMaxValue(
}

@Test
void shouldInvalidateFragment()
void shouldRejectFragment()
{
final int initialOffset = TERM_BUFFER_LENGTH - (ALIGNED_FRAME_LENGTH * 2);
final long initialPosition = computePosition(
Expand All @@ -625,9 +625,9 @@ void shouldInvalidateFragment()
assertEquals(initialPosition, image.position());

final String reason = "this is garbage";
image.invalidate(reason);
image.reject(reason);

verify(subscription).invalidate(image.correlationId(), image.position(), reason);
verify(subscription).rejectImage(image.correlationId(), image.position(), reason);

// final int fragmentsRead = image.boundedPoll(
// mockFragmentHandler, maxPosition, Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class ClientCommandAdapter implements ControlledMessageHandler
private final DestinationMessageFlyweight destinationMsgFlyweight = new DestinationMessageFlyweight();
private final CounterMessageFlyweight counterMsgFlyweight = new CounterMessageFlyweight();
private final TerminateDriverFlyweight terminateDriverFlyweight = new TerminateDriverFlyweight();
private final InvalidateImageFlyweight invalidateImageFlyweight = new InvalidateImageFlyweight();
private final RejectImageFlyweight rejectImageFlyweight = new RejectImageFlyweight();
private final DriverConductor conductor;
private final RingBuffer toDriverCommands;
private final ClientProxy clientProxy;
Expand Down Expand Up @@ -264,17 +264,17 @@ else if (channel.startsWith(SPY_QUALIFIER))
break;
}

case INVALIDATE_IMAGE:
case REJECT_IMAGE:
{
invalidateImageFlyweight.wrap(buffer, index);
invalidateImageFlyweight.validateLength(msgTypeId, length);
correlationId = invalidateImageFlyweight.correlationId();

conductor.onInvalidateImage(
invalidateImageFlyweight.correlationId(),
invalidateImageFlyweight.imageCorrelationId(),
invalidateImageFlyweight.position(),
invalidateImageFlyweight.reason());
rejectImageFlyweight.wrap(buffer, index);
rejectImageFlyweight.validateLength(msgTypeId, length);
correlationId = rejectImageFlyweight.correlationId();

conductor.onRejectImage(
rejectImageFlyweight.correlationId(),
rejectImageFlyweight.imageCorrelationId(),
rejectImageFlyweight.position(),
rejectImageFlyweight.reason());
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ void onTerminateDriver(final DirectBuffer tokenBuffer, final int tokenOffset, fi
}
}

void onInvalidateImage(
void onRejectImage(
final long correlationId,
final long imageCorrelationId,
final long position,
Expand All @@ -1437,7 +1437,7 @@ void onInvalidateImage(
GENERIC_ERROR, "Unable to resolve image for correlationId=" + imageCorrelationId);
}

receiverProxy.invalidateImage(imageCorrelationId, position, reason);
receiverProxy.rejectImage(imageCorrelationId, position, reason);
clientProxy.operationSucceeded(correlationId);
}

Expand Down
12 changes: 6 additions & 6 deletions aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PublicationImageReceiverFields extends PublicationImagePadding2
boolean isSendingEosSm = false;
long timeOfLastPacketNs;
ImageConnection[] imageConnections = new ImageConnection[1];
String invalidationReason = null;
String rejectionReason = null;
}

class PublicationImagePadding3 extends PublicationImageReceiverFields
Expand Down Expand Up @@ -590,7 +590,7 @@ int insertPacket(
{
final boolean isEndOfStream = DataHeaderFlyweight.isEndOfStream(buffer);

if (null != invalidationReason)
if (null != rejectionReason)
{
if (isEndOfStream)
{
Expand Down Expand Up @@ -707,12 +707,12 @@ int sendPendingStatusMessage(final long nowNs)
final long changeNumber = endSmChange;
final boolean hasSmTimedOut = (timeOfLastSmNs + smTimeoutNs) - nowNs < 0;

if (null != invalidationReason)
if (null != rejectionReason)
{
if (hasSmTimedOut)
{
channelEndpoint.sendErrorFrame(
imageConnections, sessionId, streamId, GENERIC_ERROR.value(), invalidationReason);
imageConnections, sessionId, streamId, GENERIC_ERROR.value(), rejectionReason);

timeOfLastSmNs = nowNs;
workCount++;
Expand Down Expand Up @@ -918,9 +918,9 @@ public boolean hasReachedEndOfLife()
return hasReceiverReleased && State.DONE == state;
}

void invalidate(final String reason)
void reject(final String reason)
{
invalidationReason = reason;
rejectionReason = reason;
}

private void state(final State state)
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/java/io/aeron/driver/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,13 @@ void onResolutionChange(
channelEndpoint.updateControlAddress(transportIndex, newAddress);
}

void onInvalidateImage(final long imageCorrelationId, final long position, final String reason)
void onRejectImage(final long imageCorrelationId, final long position, final String reason)
{
for (final PublicationImage image : publicationImages)
{
if (imageCorrelationId == image.correlationId())
{
image.invalidate(reason);
image.reject(reason);
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ void requestSetup(
}
}

void invalidateImage(final long imageCorrelationId, final long position, final String reason)
void rejectImage(final long imageCorrelationId, final long position, final String reason)
{
if (notConcurrent())
{
receiver.onInvalidateImage(imageCorrelationId, position, reason);
receiver.onRejectImage(imageCorrelationId, position, reason);
}
else
{
offer(() -> receiver.onInvalidateImage(imageCorrelationId, position, reason));
offer(() -> receiver.onRejectImage(imageCorrelationId, position, reason));
}
}
}
Loading

0 comments on commit 5bddf84

Please sign in to comment.